FELIX-1913 : All events are processed in a queue
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@905914 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/eventadmin/impl/pom.xml b/eventadmin/impl/pom.xml
index f45f5ce..b202312 100644
--- a/eventadmin/impl/pom.xml
+++ b/eventadmin/impl/pom.xml
@@ -17,63 +17,71 @@
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <parent>
- <groupId>org.apache.felix</groupId>
- <artifactId>felix</artifactId>
- <version>1.0.4</version>
- <relativePath>../../pom/pom.xml</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <packaging>bundle</packaging>
- <name>Apache Felix EventAdmin</name>
- <description>
- This bundle provides an implementation of the OSGi R4 EventAdmin service.
- </description>
- <version>1.0.1-SNAPSHOT</version>
- <artifactId>org.apache.felix.eventadmin</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.osgi</groupId>
- <artifactId>org.osgi.core</artifactId>
- <version>4.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.osgi</groupId>
- <artifactId>org.osgi.compendium</artifactId>
- <version>4.2.0</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
+ <parent>
<groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <version>1.4.0</version>
- <extensions>true</extensions>
- <configuration>
- <instructions>
- <Bundle-SymbolicName>
- ${pom.artifactId}
- </Bundle-SymbolicName>
- <Bundle-Activator>
- ${pom.artifactId}.impl.Activator
- </Bundle-Activator>
- <Bundle-Vendor>The Apache Software Foundation</Bundle-Vendor>
- <DynamicImport-Package>
- org.osgi.service.log
- </DynamicImport-Package>
- <Import-Package>!org.osgi.service.log,*</Import-Package>
- <Export-Package>org.osgi.service.event</Export-Package>
- <Private-Package>org.apache.felix.eventadmin.impl.*</Private-Package>
- <Import-Service>
- org.osgi.service.event.EventHandler, org.osgi.service.log.LogService, org.osgi.service.log.LogReaderService
- </Import-Service>
- <Export-Service>
- org.osgi.service.event.EventAdmin
- </Export-Service>
- </instructions>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <artifactId>felix-parent</artifactId>
+ <version>1.2.0</version>
+ <relativePath>../../pom/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>bundle</packaging>
+ <name>Apache Felix EventAdmin</name>
+ <description>
+ This bundle provides an implementation of the OSGi R4 EventAdmin service.
+ </description>
+ <version>1.1.0-SNAPSHOT</version>
+ <artifactId>org.apache.felix.eventadmin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ <version>4.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ <version>4.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>concurrent</groupId>
+ <artifactId>concurrent</artifactId>
+ <version>1.3.4</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>1.4.3</version>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Bundle-SymbolicName>
+ ${pom.artifactId}
+ </Bundle-SymbolicName>
+ <Bundle-Activator>
+ ${pom.artifactId}.impl.Activator
+ </Bundle-Activator>
+ <Bundle-Vendor>The Apache Software Foundation</Bundle-Vendor>
+ <DynamicImport-Package>
+ org.osgi.service.log
+ </DynamicImport-Package>
+ <Import-Package>!org.osgi.service.log,*</Import-Package>
+ <Export-Package>org.osgi.service.event</Export-Package>
+ <Private-Package>org.apache.felix.eventadmin.impl.*</Private-Package>
+ <Import-Service>
+ org.osgi.service.event.EventHandler, org.osgi.service.log.LogService, org.osgi.service.log.LogReaderService
+ </Import-Service>
+ <Export-Service>
+ org.osgi.service.event.EventAdmin
+ </Export-Service>
+ <Embed-Dependency>
+ concurrent
+ </Embed-Dependency>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/Activator.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/Activator.java
index 3a049cc..855a27e 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/Activator.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/Activator.java
@@ -18,35 +18,15 @@
*/
package org.apache.felix.eventadmin.impl;
-import org.apache.felix.eventadmin.impl.adapter.BundleEventAdapter;
-import org.apache.felix.eventadmin.impl.adapter.FrameworkEventAdapter;
-import org.apache.felix.eventadmin.impl.adapter.LogEventAdapter;
-import org.apache.felix.eventadmin.impl.adapter.ServiceEventAdapter;
-import org.apache.felix.eventadmin.impl.dispatch.CacheThreadPool;
-import org.apache.felix.eventadmin.impl.dispatch.DelayScheduler;
-import org.apache.felix.eventadmin.impl.dispatch.Scheduler;
-import org.apache.felix.eventadmin.impl.dispatch.TaskHandler;
+import org.apache.felix.eventadmin.impl.adapter.*;
+import org.apache.felix.eventadmin.impl.dispatch.DefaultThreadPool;
import org.apache.felix.eventadmin.impl.dispatch.ThreadPool;
-import org.apache.felix.eventadmin.impl.handler.BlacklistingHandlerTasks;
-import org.apache.felix.eventadmin.impl.handler.CacheFilters;
-import org.apache.felix.eventadmin.impl.handler.CacheTopicHandlerFilters;
-import org.apache.felix.eventadmin.impl.handler.CleanBlackList;
-import org.apache.felix.eventadmin.impl.handler.Filters;
-import org.apache.felix.eventadmin.impl.handler.HandlerTasks;
-import org.apache.felix.eventadmin.impl.handler.TopicHandlerFilters;
-import org.apache.felix.eventadmin.impl.security.CacheTopicPermissions;
-import org.apache.felix.eventadmin.impl.security.SecureEventAdminFactory;
-import org.apache.felix.eventadmin.impl.security.TopicPermissions;
-import org.apache.felix.eventadmin.impl.tasks.AsyncDeliverTasks;
-import org.apache.felix.eventadmin.impl.tasks.BlockTask;
-import org.apache.felix.eventadmin.impl.tasks.DeliverTasks;
-import org.apache.felix.eventadmin.impl.tasks.DispatchTask;
-import org.apache.felix.eventadmin.impl.tasks.SyncDeliverTasks;
+import org.apache.felix.eventadmin.impl.handler.*;
+import org.apache.felix.eventadmin.impl.security.*;
+import org.apache.felix.eventadmin.impl.tasks.*;
import org.apache.felix.eventadmin.impl.util.LeastRecentlyUsedCacheMap;
import org.apache.felix.eventadmin.impl.util.LogWrapper;
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.*;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.TopicPermission;
@@ -106,15 +86,9 @@
public class Activator implements BundleActivator
{
// The thread pool used - this is a member because we need to close it on stop
- private volatile ThreadPool m_pool;
+ private volatile ThreadPool m_sync_pool;
- // The asynchronous event queue - this is a member because we need to close it on
- // stop
- private volatile TaskHandler m_asyncQueue;
-
- // The synchronous event queue - this is a member because we need to close it on
- // stop
- private volatile TaskHandler m_syncQueue;
+ private volatile ThreadPool m_async_pool;
// The actual implementation of the service - this is a member because we need to
// close it on stop. Note, security is not part of this implementation but is
@@ -162,7 +136,7 @@
// the size is reached and no cached thread is available new threads will
// be created.
final int threadPoolSize = getIntProperty(
- "org.apache.felix.eventadmin.ThreadPoolSize", context, 10, 2);
+ "org.apache.felix.eventadmin.ThreadPoolSize", context, 20, 2);
// The timeout in milliseconds - A value of less then 100 turns timeouts off.
// Any other value is the time in milliseconds granted to each EventHandler
@@ -211,24 +185,18 @@
new CleanBlackList(), topicHandlerFilters, filters,
subscribePermissions);
- // Either we need a scheduler that will trigger EventHandler blacklisting
- // (timeout >= 100) or a null object (timeout < 100)
- final Scheduler scheduler = createScheduler(timeout);
-
// Note that this uses a lazy thread pool that will create new threads on
// demand - in case none of its cached threads is free - until threadPoolSize
// is reached. Subsequently, a threadPoolSize of 2 effectively disables
// caching of threads.
- m_pool = new CacheThreadPool(threadPoolSize);
+ m_sync_pool = new DefaultThreadPool(threadPoolSize, true);
+ m_async_pool = new DefaultThreadPool(threadPoolSize > 5 ? threadPoolSize / 2 : 2, false);
- m_asyncQueue = new TaskHandler();
-
- m_syncQueue = new TaskHandler();
-
+ final DeliverTask syncExecuter = createSyncExecuters( m_sync_pool, timeout);
m_admin = createEventAdmin(context,
handlerTasks,
- createAsyncExecuters(m_asyncQueue, m_syncQueue, scheduler, m_pool),
- createSyncExecuters(m_syncQueue, scheduler, m_pool));
+ createAsyncExecuters(m_async_pool, syncExecuter),
+ syncExecuter);
// register the admin wrapped in a service factory (SecureEventAdminFactory)
// that hands-out the m_admin object wrapped in a decorator that checks
@@ -258,39 +226,17 @@
m_admin.stop();
- // This tasks will be unblocked once the queues are empty
- final BlockTask asyncShutdownBlock = new BlockTask();
-
- final BlockTask syncShutdownBlock = new BlockTask();
-
- // Now close the queues. Note that already added tasks will be delivered
- // The given shutdownTask will be executed once the queue is empty
- m_asyncQueue.close(asyncShutdownBlock);
-
- m_syncQueue.close(syncShutdownBlock);
-
m_admin = null;
- m_asyncQueue = null;
-
- m_syncQueue = null;
-
m_registration = null;
- final DispatchTask task = m_pool.getTask(Thread.currentThread(), null);
+ m_async_pool.close();
- if(null != task)
- {
- task.handover();
- }
+ m_sync_pool.close();
- asyncShutdownBlock.block();
+ m_async_pool = null;
- syncShutdownBlock.block();
-
- m_pool.close();
-
- m_pool = null;
+ m_sync_pool = null;
}
@@ -304,8 +250,8 @@
*/
protected EventAdminImpl createEventAdmin(BundleContext context,
HandlerTasks handlerTasks,
- DeliverTasks asyncExecuters,
- DeliverTasks syncExecuters)
+ DeliverTask asyncExecuters,
+ DeliverTask syncExecuters)
{
return new EventAdminImpl(handlerTasks, asyncExecuters, syncExecuters);
}
@@ -315,17 +261,10 @@
* events. Additionally, the asynchronous dispatch queue is initialized and
* activated (i.e., a thread is started via the given ThreadPool).
*/
- private DeliverTasks createAsyncExecuters(final TaskHandler handler,
- final TaskHandler handoverHandler, final Scheduler scheduler,
- final ThreadPool pool)
+ private DeliverTask createAsyncExecuters(final ThreadPool pool, final DeliverTask deliverTask)
{
// init the queue
- final AsyncDeliverTasks result = new AsyncDeliverTasks(handler,
- handoverHandler, pool);
-
- // set-up the queue for asynchronous event delivery and activate it
- // (i.e., a thread is started via the pool)
- result.execute(new DispatchTask(handler, scheduler, result));
+ final AsyncDeliverTasks result = new AsyncDeliverTasks(pool, deliverTask);
return result;
}
@@ -335,35 +274,15 @@
* Additionally, the synchronous dispatch queue is initialized and activated
* (i.e., a thread is started via the given ThreadPool).
*/
- private DeliverTasks createSyncExecuters(final TaskHandler handler,
- final Scheduler scheduler, final ThreadPool pool)
+ private DeliverTask createSyncExecuters(final ThreadPool pool, final long timeout)
{
// init the queue
- final SyncDeliverTasks result = new SyncDeliverTasks(handler, pool);
-
- // set-up the queue for synchronous event delivery and activate it
- // (i.e. a thread is started via the pool)
- result.execute(new DispatchTask(handler, scheduler, result));
+ final SyncDeliverTasks result = new SyncDeliverTasks(pool, (timeout > 100 ? timeout : 0));
return result;
}
/*
- * Returns either a new DelayScheduler with a delay of timeout or the
- * Scheduler.NULL_SCHEDULER in case timeout is < 100 in which case timeout and
- * subsequently black-listing is disabled.
- */
- private Scheduler createScheduler(final int timeout)
- {
- if(100 > timeout)
- {
- return Scheduler.NULL_SCHEDULER;
- }
-
- return new DelayScheduler(timeout);
- }
-
- /*
* Init the adapters in org.apache.felix.eventadmin.impl.adapter
*/
private void adaptEvents(final BundleContext context, final EventAdmin admin)
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/EventAdminImpl.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/EventAdminImpl.java
index 7baab6a..1d46295 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/EventAdminImpl.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/EventAdminImpl.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,22 +19,22 @@
package org.apache.felix.eventadmin.impl;
import org.apache.felix.eventadmin.impl.handler.HandlerTasks;
-import org.apache.felix.eventadmin.impl.tasks.DeliverTasks;
+import org.apache.felix.eventadmin.impl.tasks.DeliverTask;
import org.apache.felix.eventadmin.impl.tasks.HandlerTask;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
/**
- * This is the actual implementation of the OSGi R4 Event Admin Service (see the
+ * This is the actual implementation of the OSGi R4 Event Admin Service (see the
* Compendium 113 for details). The implementation uses a <tt>HandlerTasks</tt>
* in order to determine applicable <tt>EventHandler</tt> for a specific event and
* subsequently dispatches the event to the handlers via <tt>DeliverTasks</tt>.
* To do this, it uses two different <tt>DeliverTasks</tt> one for asynchronous and
* one for synchronous event delivery depending on whether its <tt>post()</tt> or
- * its <tt>send()</tt> method is called. Note that the actual work is done in the
+ * its <tt>send()</tt> method is called. Note that the actual work is done in the
* implementations of the <tt>DeliverTasks</tt>. Additionally, a stop method is
* provided that prevents subsequent events to be delivered.
- *
+ *
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
public class EventAdminImpl implements EventAdmin
@@ -44,42 +44,42 @@
private volatile HandlerTasks m_managers;
// The asynchronous event dispatcher
- private final DeliverTasks m_postManager;
+ private final DeliverTask m_postManager;
// The synchronous event dispatcher
- private final DeliverTasks m_sendManager;
+ private final DeliverTask m_sendManager;
/**
- * The constructor of the <tt>EventAdmin</tt> implementation. The
- * <tt>HandlerTasks</tt> factory is used to determine applicable
- * <tt>EventHandler</tt> for a given event. Additionally, the two
+ * The constructor of the <tt>EventAdmin</tt> implementation. The
+ * <tt>HandlerTasks</tt> factory is used to determine applicable
+ * <tt>EventHandler</tt> for a given event. Additionally, the two
* <tt>DeliverTasks</tt> are used to dispatch the event.
- *
+ *
* @param managers The factory used to determine applicable <tt>EventHandler</tt>
* @param postManager The asynchronous event dispatcher
* @param sendManager The synchronous event dispatcher
*/
public EventAdminImpl(final HandlerTasks managers,
- final DeliverTasks postManager, final DeliverTasks sendManager)
+ final DeliverTask postManager, final DeliverTask sendManager)
{
checkNull(managers, "Managers");
checkNull(postManager, "PostManager");
checkNull(sendManager, "SendManager");
-
+
m_managers = managers;
m_postManager = postManager;
m_sendManager = sendManager;
}
-
+
/**
* Post an asynchronous event.
- *
+ *
* @param event The event to be posted by this service
- *
+ *
* @throws IllegalStateException - In case we are stopped
- *
+ *
* @see org.osgi.service.event.EventAdmin#postEvent(org.osgi.service.event.Event)
*/
public void postEvent(final Event event)
@@ -89,11 +89,11 @@
/**
* Send a synchronous event.
- *
+ *
* @param event The event to be send by this service
*
* @throws IllegalStateException - In case we are stopped
- *
+ *
* @see org.osgi.service.event.EventAdmin#sendEvent(org.osgi.service.event.Event)
*/
public void sendEvent(final Event event)
@@ -102,24 +102,24 @@
}
/**
- * This method can be used to stop the delivery of events. The m_managers is
+ * This method can be used to stop the delivery of events. The m_managers is
* replaced with a null object that throws an IllegalStateException on a call
* to <tt>createHandlerTasks()</tt>.
*/
public void stop()
{
- // replace the HandlerTasks with a null object that will throw an
+ // replace the HandlerTasks with a null object that will throw an
// IllegalStateException on a call to createHandlerTasks
m_managers = new HandlerTasks()
{
/**
- * This is a null object and this method will throw an
+ * This is a null object and this method will throw an
* IllegalStateException due to the bundle being stopped.
- *
+ *
* @param event An event that is not used.
- *
+ *
* @return This method does not return normally
- *
+ *
* @throws IllegalStateException - This is a null object and this method
* will always throw an IllegalStateException
*/
@@ -129,13 +129,13 @@
}
};
}
-
+
/*
- * This is a utility method that uses the given DeliverTasks to create a
+ * This is a utility method that uses the given DeliverTasks to create a
* dispatch tasks that subsequently is used to dispatch the given HandlerTasks.
*/
private void handleEvent(final HandlerTask[] managers,
- final DeliverTasks manager)
+ final DeliverTask manager)
{
if (0 < managers.length)
{
@@ -145,13 +145,13 @@
// events whenever they receive an event from their source.
// Service importers that call us regardless of the fact that we are
// stopped deserve an exception anyways
- manager.createTask().execute(managers);
+ manager.execute(managers);
}
}
-
+
/*
* This is a utility method that will throw a <tt>NullPointerException</tt>
- * in case that the given object is null. The message will be of the form
+ * in case that the given object is null. The message will be of the form
* "${name} + may not be null".
*/
private void checkNull(final Object object, final String name)
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/CacheThreadPool.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/CacheThreadPool.java
deleted file mode 100644
index b5d1e75..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/CacheThreadPool.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.dispatch;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.felix.eventadmin.impl.tasks.DeliverTask;
-import org.apache.felix.eventadmin.impl.tasks.DispatchTask;
-
-/**
- * An implementation of a thread pool that uses a fixed number of cached threads
- * but will spin-off new threads as needed. The underlying assumption is that
- * threads that have been created more recently will be available sooner then older
- * threads hence, once the pool size is reached older threads will be decoupled from
- * the pool and the newly created are added to it.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-// TODO: The least recently used method deployed is rather a hack in this case
-// it really should be refactored into a plugable strategy. However, I believe
-// it to be the best strategy in this case.
-public class CacheThreadPool implements ThreadPool
-{
- // The internal lock for this object used instead synchronized(this)
- // Note that it is used by the pooled threads created by this pool too. This is
- // the reason why it is not private. Don't use it from the outside.
- final Object m_lock = new Object();
-
- // The pooled threads
- private final PooledThread[] m_pool;
-
- // The least recently used index
- private final List m_index;
-
- // Is this pool closed i.e., do we not pool thread anymore?
- private boolean m_closed = false;
-
- /**
- * The constructor of the pool. The given size will be used as the max number of
- * pooled threads.
- *
- * @param size The max number of threads pooled at a given time.
- */
- public CacheThreadPool(final int size)
- {
- synchronized (m_lock)
- {
- m_pool = new PooledThread[size];
-
- // We assume that a list is expanded once it reaches half of its capacity
- // and it doesn't harm if the assumption is wrong.
- m_index = new ArrayList(size + 1 + (size / 2));
- }
- }
-
- /**
- * Executes the task in a thread out of the pool or a new thread if no pooled
- * thread is available. In case that the max size is reached the least recently
- * used (i.e., the longest executing) thread in the pool is decoupled and a new
- * one added to the pool that is used to execute the task.
- *
- * @param task The task to execute
- * @param callback The callback associated with the task
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.ThreadPool#execute(DispatchTask, DeliverTask)
- */
- public void execute(final DispatchTask task, final DeliverTask callback)
- {
- // Note that we associate a callback with a task via the thread used to
- // execute the task. In general, a free slot in the pool (i.e., m_pool[i] is
- // null) can be used to set-up a new thread. Also note that we need to
- // update the LRU index if we change the pool.
- synchronized(m_lock)
- {
- if(m_closed)
- {
- // We are closed hence, spin-of a new thread for the new task.
- final PooledThread result = new PooledThread();
-
- // Set-up the thread and associate the task with the callback.
- result.reset(task, callback);
-
- // release the thread immediately since we don't pool anymore.
- result.release();
-
- return;
- }
-
- // Search in the pool for a free thread.
- for (int i = 0; i < m_pool.length; i++)
- {
- // o.k. we found a free slot now set-up a new thread for it.
- if (null == m_pool[i])
- {
- m_pool[i] = new PooledThread();
-
- m_pool[i].reset(task, callback);
-
- m_index.add(new Integer(i));
-
- return;
- }
- else if (m_pool[i].available())
- {
- // we found a free thread now set it up.
- m_pool[i].reset(task, callback);
-
- final Integer idx = new Integer(i);
-
- m_index.remove(idx);
-
- m_index.add(idx);
-
- return;
- }
- }
-
- // The pool is full and no threads are available hence, spin-off a new
- // thread and add it to the pool while decoupling the least recently used
- // one. This assumes that older threads are likely to take longer to
- // become available again then younger ones.
- final int pos = ((Integer) m_index.remove(0)).intValue();
-
- m_index.add(new Integer(pos));
-
- m_pool[pos].release();
-
- m_pool[pos] = new PooledThread();
-
- m_pool[pos].reset(task, callback);
- }
- }
-
- /**
- * Look-up the callback associated with the task that the given thread is
- * currently executing or return the default value that may be <tt>null</tt>.
- *
- * @param thread The thread that is currently executing the task for which to
- * return the callback. In case the thread is not created by an instance of
- * this class the default value will be returned.
- * @param defaultCallback The value to return in case that the thread was not
- * created by an instance of this class. May be <tt>null</tt>
- * @return The callback associated with the given thread or the default value.
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.ThreadPool#getCallback(Thread, DeliverTask)
- */
- public DeliverTask getCallback(final Thread thread, final DeliverTask defaultCallback)
- {
- synchronized (m_lock)
- {
- if (thread instanceof PooledThread)
- {
- return ((PooledThread) thread).getCallback();
- }
-
- return defaultCallback;
- }
- }
-
- /**
- * Look-up the task that the given thread is currently executing or return the
- * default value that may be <tt>null</tt> in case that the thread has not been
- * created by an instance of this class.
- *
- * @param thread The thread whose currently executed task should be returned.
- * @param defaultTask The default value to be returned in case that the thread
- * was not created by this instance or doesn't currently has a task. May be
- * <tt>null</tt>
- * @return The task the given thread is currently executing or the defaultTask
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.ThreadPool#getTask(Thread, DispatchTask)
- */
- public DispatchTask getTask(Thread thread, DispatchTask defaultTask)
- {
- synchronized (m_lock)
- {
- if (thread instanceof PooledThread)
- {
- return ((PooledThread) thread).getTask();
- }
-
- return defaultTask;
- }
- }
-
- /**
- * Close the pool i.e, stop pooling threads. Note that subsequently, task will
- * still be executed but no pooling is taking place anymore.
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.ThreadPool#close()
- */
- public void close()
- {
- synchronized (m_lock)
- {
- // We are closed hence, decouple all threads from the pool
- for (int i = 0; i < m_pool.length; i++)
- {
- if (null != m_pool[i])
- {
- m_pool[i].release();
-
- m_pool[i] = null;
- }
- }
-
- m_closed = true;
- }
- }
-
- /*
- * The threads created by this pool. A PooledThread blocks until it gets a new
- * task from the pool or is released. Additionally, it is used to associate
- * the task it currently runs with its callback.
- */
- private class PooledThread extends Thread
- {
- // The current task or null if none
- private DispatchTask m_runnable = null;
-
- // The callback associated with the current task
- private DeliverTask m_callback = null;
-
- // Is this thread decoupled from the pool (i.e, may cease to exists once its
- // current task is finished)?
- private boolean m_released = false;
-
- /*
- * This will set-up the thread as a daemon and start it too. No need to call
- * its start method explicitly
- */
- PooledThread()
- {
- setDaemon(true);
-
- start();
- }
-
-
- /**
- * Call next() in a loop until next() returns null indicating that we are
- * done (i.e., decoupled from the pool) and may cease to exist.
- */
- public void run()
- {
- for (Runnable next = next(); null != next; next = next())
- {
- next.run();
-
- synchronized (m_lock)
- {
- m_runnable = null;
- }
- }
- }
-
- /*
- * Block until a new task is available or we are decoupled from the pool.
- * This will return the next task or null if we are decoupled from the pool
- */
- private DispatchTask next()
- {
- synchronized (m_lock)
- {
- while (null == m_runnable)
- {
- if (m_released)
- {
- return null;
- }
-
- try
- {
- m_lock.wait();
- } catch (InterruptedException e)
- {
- // Not needed
- }
- }
-
- return m_runnable;
- }
- }
-
- /*
- * Set-up the thread for the next task
- */
- void reset(final DispatchTask next, final DeliverTask callback)
- {
- synchronized (m_lock)
- {
- m_runnable = next;
- m_callback = callback;
- m_lock.notifyAll();
- }
- }
-
- /*
- * Return the callback associated with the current task
- */
- DeliverTask getCallback()
- {
- synchronized (m_lock)
- {
- return m_callback;
- }
- }
-
- /*
- * Return whether this thread is available (i.e., has no task and has not
- * been released) or not.
- */
- boolean available()
- {
- synchronized (m_lock)
- {
- return (null == m_runnable) && (!m_released);
- }
- }
-
- /*
- * Return the current task or null if none
- */
- DispatchTask getTask()
- {
- synchronized (m_lock)
- {
- return m_runnable;
- }
- }
-
- /*
- * Decouple this thread from the pool
- */
- void release()
- {
- synchronized (m_lock)
- {
- m_released = true;
-
- m_lock.notifyAll();
- }
- }
- }
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/DefaultThreadPool.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/DefaultThreadPool.java
new file mode 100644
index 0000000..f7bcd08
--- /dev/null
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/DefaultThreadPool.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.felix.eventadmin.impl.dispatch;
+
+import org.apache.felix.eventadmin.impl.tasks.SyncThread;
+import org.apache.felix.eventadmin.impl.util.LogWrapper;
+
+import EDU.oswego.cs.dl.util.concurrent.*;
+
+/**
+ * The DefaultThreadPool class implements the {@link ThreadPool} interface.
+ *
+ * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
+ */
+public class DefaultThreadPool
+ extends PooledExecutor
+ implements ThreadPool
+{
+
+ /**
+ * Create a new pool.
+ */
+ public DefaultThreadPool(final int poolSize, final boolean syncThreads)
+ {
+ super(new LinkedQueue());
+ if ( syncThreads )
+ {
+ this.setThreadFactory(new ThreadFactory()
+ {
+
+ public Thread newThread( final Runnable command )
+ {
+ final Thread thread = new SyncThread( command );
+ thread.setPriority( Thread.NORM_PRIORITY );
+ thread.setDaemon( false );
+
+ return thread;
+ }
+ });
+ }
+ else
+ {
+ this.setThreadFactory(new ThreadFactory()
+ {
+
+ public Thread newThread( final Runnable command )
+ {
+ final Thread thread = new Thread( command );
+ thread.setPriority( Thread.NORM_PRIORITY );
+ thread.setDaemon( false );
+
+ return thread;
+ }
+ });
+ }
+ super.setMinimumPoolSize(poolSize);
+ super.setMaximumPoolSize(poolSize + 10);
+ super.setKeepAliveTime(60000);
+ runWhenBlocked();
+ }
+
+ /**
+ * @see org.apache.felix.eventadmin.impl.dispatch.ThreadPool#close()
+ */
+ public void close()
+ {
+ shutdownAfterProcessingCurrentlyQueuedTasks();
+
+ try
+ {
+ awaitTerminationAfterShutdown();
+ }
+ catch (final InterruptedException ie)
+ {
+ // ignore this
+ }
+ }
+
+ /**
+ * @see org.apache.felix.eventadmin.impl.dispatch.ThreadPool#executeTask(java.lang.Runnable)
+ */
+ public void executeTask(Runnable task)
+ {
+ try
+ {
+ super.execute(task);
+ }
+ catch (Throwable t)
+ {
+ LogWrapper.getLogger().log(
+ LogWrapper.LOG_WARNING,
+ "Exception: " + t, t);
+ // ignore this
+ }
+ }
+}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/DelayScheduler.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/DelayScheduler.java
deleted file mode 100644
index 2bc70dc..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/DelayScheduler.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.dispatch;
-
-import java.util.Date;
-import java.util.Timer;
-import java.util.TimerTask;
-
-/**
- * A simple delay scheduler that schedules tasks based on a fixed delay. Possible
- * nice values are subtracted from this delay where appropriate. Note that this
- * class uses a <tt>java.util.Timer</tt> internally that is set to be a daemon hence,
- * allows to shutdown the vm regardless but can not be stopped. The spec says that
- * a <tt>java.util.Timer</tt> without a reference to itself should go away eventually
- * but no guaranties are given. It follows that once the bundle is stopped all
- * references to instances of this class should be released and this in turn will
- * allow that the timer thread goes away eventually, but this may take an arbitrary
- * amount of time.
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.Scheduler
- * @see java.util.Timer
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public class DelayScheduler implements Scheduler
-{
- // The timer used for scheduling. Note that it will not be stopped by but
- // by the vm once all references to this instance are gone (at least eventually).
- private final Timer m_timer = new Timer(true);
-
- private final int m_delay;
-
- /**
- * The constructor of the scheduler. The scheduler will use the given delay to
- * schedule tasks accordingly.
- *
- * @param delay The delay in milliseconds before a task is executed
- */
- public DelayScheduler(final int delay)
- {
- m_delay = delay;
- }
-
- /**
- * Schedule the task to execute after the given delay.
- *
- * @param task The task to schedule for execution.
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.Scheduler#schedule(java.lang.Runnable)
- */
- public void schedule(final Runnable task)
- {
- scheduleTaskWithDelay(task, m_delay);
- }
-
- /**
- * Schedule the task to execute after the given delay minus the nice.
- *
- * @param task The task to schedule for execution after delay minus nice
- * @param nice The time to subtract from the delay.
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.Scheduler#schedule(java.lang.Runnable, int)
- */
- public void schedule(final Runnable task, int nice)
- {
- scheduleTaskWithDelay(task, m_delay - nice);
- }
-
- /*
- * This method creates a new TimerTask as a wrapper around the given task
- * and calls the m_timer.schedule method with it and the current time plus the
- * delay.
- */
- private void scheduleTaskWithDelay(final Runnable task, final int delay)
- {
- m_timer.schedule(new TimerTask()
- {
- public void run()
- {
- task.run();
- }
- }, new Date(System.currentTimeMillis() + delay));
- }
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/Scheduler.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/Scheduler.java
deleted file mode 100644
index ae4afc2..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/Scheduler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.dispatch;
-
-/**
- * A simple scheduler that accepts a task and schedules its for execution at
- * its own discretion (i.e., the behavior of the actual implementor). The only
- * possible hint is a nice value that should be subtracted from any fixed scheduling
- * interval. Additionally, a null object is provided that can be used to disable
- * scheduled execution.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public interface Scheduler
-{
- /**
- * This is a null object that can be used in case no scheduling is needed. In
- * other words tasks given to this scheduler are never executed.
- */
- public final Scheduler NULL_SCHEDULER = new Scheduler(){
- /**
- * This is a null object hence, this method does nothing.
- *
- * @param task A task that will never be run.
- */
- public void schedule(final Runnable task)
- {
- // This is a null object hence we don't do nothing.
- }
-
- /**
- * This is a null object hence, this method does nothing.
- *
- * @param task A task that will never be run.
- * @param nice A nice value that will never be used.
- */
- public void schedule(final Runnable task, final int nice)
- {
- // This is a null object hence we don't do nothing.
- }
- };
-
- /**
- * Schedule the given task for execution at a later time based on the behavior
- * of the actual implementor of this interface. Note that this may mean that
- * the task is never executed.
- *
- * @param task The task to schedule for execution.
- */
- public void schedule(final Runnable task);
-
- /**
- * Schedule the given task for execution at a later time based on the behavior
- * of the actual implementor of this interface. Note that this may mean that
- * the task is never executed. The nice value should be subtracted from any fixed
- * scheduling interval.
- *
- * @param task The task to schedule for execution.
- * @param nice A value to subtract from any fixed scheduling interval.
- */
- public void schedule(final Runnable task, final int nice);
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/TaskHandler.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/TaskHandler.java
deleted file mode 100644
index 69804a7..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/TaskHandler.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.dispatch;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.felix.eventadmin.impl.tasks.HandlerTask;
-
-/**
- * This class implements the <tt>TaskQueue</tt> and the <tt>TaskProducer</tt>
- * interface. It makes the tasks added via the queue interface available via the
- * producer interface until the queue is closed and the producer returns
- * <tt>null</tt>.
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.TaskQueue
- * @see org.apache.felix.eventadmin.impl.dispatch.TaskProducer
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public class TaskHandler implements TaskQueue, TaskProducer
-{
- // The queue that is used as a lock as well
- private final List m_queue = new ArrayList();
-
- // Are we closed?
- private boolean m_closed = false;
-
- /**
- * Append the tasks to this queue in one atomic operation while preserving their
- * order.
- *
- * @param tasks The tasks to append to this queue
- *
- * @throws IllegalStateException in case that this queue is already closed
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.TaskQueue#append(HandlerTask[])
- */
- public void append(final HandlerTask[] tasks)
- {
- synchronized (m_queue)
- {
- if(m_closed)
- {
- throw new IllegalArgumentException("Queue is closed");
- }
-
- for (int i = 0; i < tasks.length; i++)
- {
- m_queue.add(tasks[i]);
- }
-
- if(!m_queue.isEmpty())
- {
- m_queue.notifyAll();
- }
- }
- }
-
- /**
- * Push the tasks to this queue in one atomic operation while preserving their
- * order.
- *
- * @param tasks The tasks to push to the front of this queue.
- *
- * @throws IllegalStateException in case that this queue is already closed
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.TaskQueue#push(HandlerTask[])
- */
- public void push(final HandlerTask[] tasks)
- {
- synchronized (m_queue)
- {
- if(m_closed)
- {
- throw new IllegalArgumentException("Queue is closed");
- }
-
- for (int i = tasks.length -1; i >= 0; i--)
- {
- m_queue.add(0, tasks[i]);
- }
-
- if(!m_queue.isEmpty())
- {
- m_queue.notifyAll();
- }
- }
- }
-
- /**
- * Close the queue. The given shutdown task will be executed once the queue is
- * empty.
- *
- * @param shutdownTask The task to execute once the queue is empty
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.TaskQueue#close(HandlerTask)
- */
- public void close(final HandlerTask shutdownTask)
- {
- synchronized(m_queue)
- {
- m_closed = true;
-
- m_queue.add(shutdownTask);
-
- m_queue.notifyAll();
- }
- }
-
- /**
- * Block until a new task is ready and is returned or no more tasks will be
- * returned.
- *
- * @return The next task or <tt>null</tt> if no more tasks will be produced
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.TaskProducer#next()
- */
- public HandlerTask next()
- {
- synchronized (m_queue)
- {
- while(!m_closed && m_queue.isEmpty())
- {
- try
- {
- m_queue.wait();
- } catch (InterruptedException e)
- {
- // Not needed
- }
- }
-
- if(!m_queue.isEmpty())
- {
- return (HandlerTask) m_queue.remove(0);
- }
-
- return null;
- }
- }
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/TaskProducer.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/TaskProducer.java
deleted file mode 100644
index 541c8ad..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/TaskProducer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.dispatch;
-
-import org.apache.felix.eventadmin.impl.tasks.HandlerTask;
-
-/**
- * Instances of this interface will deliver new tasks as soon as they are available
- * while blocking in the <tt>next()</tt> call until then. Unless there won't be any
- * more tasks in which case <tt>null</tt> is returned.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public interface TaskProducer
-{
- /**
- * Block until a new task is ready and is returned or no more tasks will be
- * returned.
- *
- * @return The next task or <tt>null</tt> if no more tasks will be produced
- */
- public HandlerTask next();
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/TaskQueue.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/TaskQueue.java
deleted file mode 100644
index 9ba7980..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/TaskQueue.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.dispatch;
-
-import org.apache.felix.eventadmin.impl.tasks.HandlerTask;
-
-/**
- * This is the interface for a simple queue that allows to append or push arrays
- * of tasks to it. The elements of such an array are added atomically (i.e, they
- * are in the same order one after the other in the queue) either at the end or the
- * front of the queue. Additionally, the queue can be closed.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public interface TaskQueue
-{
- /**
- * Append the tasks to this queue in one atomic operation while preserving their
- * order.
- *
- * @param tasks The tasks to append to this queue
- *
- * @throws IllegalStateException in case that this queue is already closed
- */
- public void append(HandlerTask[] tasks);
-
- /**
- * Push the tasks to this queue in one atomic operation while preserving their
- * order.
- *
- * @param tasks The tasks to push to the front of this queue.
- *
- * @throws IllegalStateException in case that this queue is already closed
- */
- public void push(HandlerTask[] tasks);
-
- /**
- * Close the queue. The given callback will be executed once the queue is empty.
- *
- * @param shutdownTask The task to execute once the queue is empty
- */
- public void close(final HandlerTask shutdownTask);
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/ThreadPool.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/ThreadPool.java
index c8c3e91..0dfd3e8 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/ThreadPool.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/dispatch/ThreadPool.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,59 +18,28 @@
*/
package org.apache.felix.eventadmin.impl.dispatch;
-import org.apache.felix.eventadmin.impl.tasks.DeliverTask;
-import org.apache.felix.eventadmin.impl.tasks.DispatchTask;
/**
* A ThreadPool interface that allows to execute tasks using pooled threads in order
* to ease the thread creation overhead and additionally, to associate a callback
- * with the thread that executes the task. Subsequently, the callback for a given
+ * with the thread that executes the task. Subsequently, the callback for a given
* thread can be asked from instances of this class. Finally, the currently executed
* task of a thread created by this pool can be retrieved as well. The look-up
* methods accept plain thread objects and will return given default values in case
* that the specific threads have not been created by this pool. Note that a closed
- * pool should still execute new tasks but stop pooling threads.
- *
+ * pool should still execute new tasks but stop pooling threads.
+ *
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
public interface ThreadPool
{
/**
- * Execute the task in a free thread or create a new one. The given callback
- * will be associated with the executing thread as long as it is executed.
- *
+ * Execute the task in a free thread or create a new one. The given callback
+ * will be associated with the executing thread as long as it is executed.
+ *
* @param task The task to execute
- * @param callback The callback that will be associated with the executing thread
- * or <tt>null</tt> if none.
*/
- public void execute(final DispatchTask task, final DeliverTask callback);
-
- /**
- * Look-up the callback associated with the task that the given thread is
- * currently executing or return the default value that may be <tt>null</tt>.
- *
- * @param thread The thread that is currently executing the task for which to
- * return the callback. In case the thread is not created by an instance of
- * this class the default value will be returned.
- * @param defaultCallback The value to return in case that the thread was not
- * created by an instance of this class. May be <tt>null</tt>
- * @return The callback associated with the given thread or the default value.
- */
- public DeliverTask getCallback(final Thread thread,
- final DeliverTask defaultCallback);
-
- /**
- * Look-up the task that the given thread is currently executing or return the
- * default value that may be <tt>null</tt> in case that the thread has not been
- * created by an instance of this class.
- *
- * @param thread The thread whose currently executed task should be returned.
- * @param defaultTask The default value to be returned in case that the thread
- * was not created by this instance or doesn't currently has a task. May be
- * <tt>null</tt>
- * @return The task the given thread is currently executing or the defaultTask
- */
- public DispatchTask getTask(final Thread thread, final DispatchTask defaultTask);
+ public void executeTask(final Runnable task);
/**
* Close the pool i.e, stop pooling threads. Note that subsequently, task will
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/Filters.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/Filters.java
index 3dc1899..f7f56cc 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/Filters.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/Filters.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,15 +20,13 @@
import java.util.Dictionary;
-import org.osgi.framework.Filter;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceReference;
+import org.osgi.framework.*;
/**
* The factory for <tt>Filter</tt> objects. Additionally, two null filter objects
* are provided that either always return <tt>true</tt> or <tt>false</tt>,
* respectively.
- *
+ *
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
public interface Filters
@@ -36,12 +34,12 @@
/**
* A null filter object that matches any given service reference.
*/
- public static final Filter TRUE_FILTER = new Filter()
+ Filter TRUE_FILTER = new Filter()
{
/**
* This is a null object that always returns <tt>true</tt>.
- *
+ *
* @param reference An unused service reference
* @return <tt>true</tt>
*/
@@ -52,7 +50,7 @@
/**
* This is a null object that always returns <tt>true</tt>.
- *
+ *
* @param dictionary An unused dictionary
* @return <tt>true</tt>
*/
@@ -63,7 +61,7 @@
/**
* This is a null object that always returns <tt>true</tt>.
- *
+ *
* @param dictionary An unused dictionary.
* @return <tt>true</tt>
*/
@@ -74,14 +72,14 @@
};
/**
- * A null filter object that does not match any given service reference.
+ * A null filter object that does not match any given service reference.
*/
- public static final Filter FALSE_FILTER = new Filter()
+ Filter FALSE_FILTER = new Filter()
{
/**
* This is a null object that always returns <tt>false</tt>.
- *
+ *
* @param reference An unused reference.
* @return <tt>false</tt>
*/
@@ -92,7 +90,7 @@
/**
* This is a null object that always returns <tt>false</tt>.
- *
+ *
* @param dictionary An unused dictionary
* @return <tt>false</tt>
*/
@@ -103,7 +101,7 @@
/**
* This is a null object that always returns <tt>false</tt>.
- *
+ *
* @param dictionary An unused dictionary.
* @return <tt>false</tt>
*/
@@ -116,14 +114,14 @@
/**
* Create a filter for the given filter string or return the nullFilter in case
* the string is <tt>null</tt>.
- *
+ *
* @param filter The filter as a string
* @param nullFilter The default value to return if filter is <tt>null</tt>
- * @return The <tt>Filter</tt> of the filter string or the nullFilter if the
+ * @return The <tt>Filter</tt> of the filter string or the nullFilter if the
* filter string was null
* @throws InvalidSyntaxException if <tt>BundleContext.createFilter()</tt>
* throws an <tt>InvalidSyntaxException</tt>
*/
- public Filter createFilter(final String filter, final Filter nullFilter)
+ Filter createFilter(final String filter, final Filter nullFilter)
throws InvalidSyntaxException;
}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/HandlerTasks.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/HandlerTasks.java
index ea7b794..2d49159 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/HandlerTasks.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/HandlerTasks.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,7 +24,7 @@
/**
* The factory for event handler tasks. Implementations of this interface can be
* used to create tasks that handle the delivery of events to event handlers.
- *
+ *
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
public interface HandlerTasks
@@ -32,10 +32,10 @@
/**
* Create the handler tasks for the event. All matching event handlers must
* be determined and delivery tasks for them returned.
- *
+ *
* @param event The event for which' handlers delivery tasks must be created
- *
+ *
* @return A delivery task for each handler that matches the given event
*/
- public HandlerTask[] createHandlerTasks(final Event event);
+ HandlerTask[] createHandlerTasks(final Event event);
}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/TopicHandlerFilters.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/TopicHandlerFilters.java
index 5ee388a..c55c742 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/TopicHandlerFilters.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/TopicHandlerFilters.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,7 +20,7 @@
/**
* The factory for <tt>EventHandler</tt> filters based on a certain topic.
- *
+ *
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
public interface TopicHandlerFilters
@@ -28,11 +28,11 @@
/**
* Create a filter that will match all <tt>EventHandler</tt> services that match
* the given topic.
- *
+ *
* @param topic The topic to match
- *
- * @return A filter that will match all <tt>EventHandler</tt> services for
+ *
+ * @return A filter that will match all <tt>EventHandler</tt> services for
* the given topic.
*/
- public String createFilterForTopic(final String topic);
+ String createFilterForTopic(final String topic);
}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java
index b6f2a65..2d09352 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,124 +18,113 @@
*/
package org.apache.felix.eventadmin.impl.tasks;
-import org.apache.felix.eventadmin.impl.dispatch.TaskQueue;
+import java.util.*;
+
import org.apache.felix.eventadmin.impl.dispatch.ThreadPool;
/**
- * This class does the actual work of the asynchronous event dispatch.
- *
- * <p>It serves two purposes: first, it will append tasks to its queue hence,
- * asynchronous event delivery is executed - second, it will set up a given dispatch
- * task with its <tt>ThreadPool</tt> in a way that it is associated with a
- * <tt>DeliverTask</tt> that will block in case the thread hits the
- * <tt>SyncDeliverTasks</tt>.
- * </p>
- * In other words, if the asynchronous event dispatching thread is used to send a
- * synchronous event then it will spin-off a new asynchronous dispatching thread
- * while the former waits for the synchronous event to be delivered and then return
- * to its <tt>ThreadPool</tt>.
- *
+ * This class does the actual work of the asynchronous event dispatch.
+ *
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
-public class AsyncDeliverTasks implements DeliverTasks, HandoverTask, DeliverTask
+public class AsyncDeliverTasks implements DeliverTask
{
- // The asynchronous event delivery queue
- private final TaskQueue m_queue;
-
- // The synchronous event delivery queue needed in case that the asynchronous
- // event dispatching thread is used to send a synchronous event. This is a
- // private member and only default because it is used in an inner class (for
- // performance reasons)
- final TaskQueue m_handoverQueue;
-
- // The thread pool to use to spin-off new threads
+ /** The thread pool to use to spin-off new threads. */
private final ThreadPool m_pool;
-
+
+ /** The deliver task for actually delivering the events. This
+ * is the sync deliver tasks as this has all the code for timeout
+ * handling etc.
+ */
+ private final DeliverTask m_deliver_task;
+
+ /** A map of running threads currently delivering async events. */
+ private final Map m_running_threads = new HashMap();
+
/**
- * The constructor of the class that will use the asynchronous queue to append
- * event dispatch handlers. Furthermore, a second queue is used to append
- * the events in case that the asynchronous event dispatching thread is used to
- * send a synchronous event - in this case the given <tt>ThreadPool</tt> is used
- * to spin-off a new asynchronous event dispatching thread while the former waits
- * for the synchronous event to be delivered.
- *
- * @param queue The asynchronous event queue
- * @param handoverQueue The synchronous event queue, to be used in case that the
- * asynchronous event dispatching thread is used to send a synchronous event
- * @param pool The thread pool used to spin-off new asynchronous event
- * dispatching threads in case of timeout or that the asynchronous event
+ * The constructor of the class that will use the asynchronous.
+ *
+ * @param pool The thread pool used to spin-off new asynchronous event
+ * dispatching threads in case of timeout or that the asynchronous event
* dispatching thread is used to send a synchronous event
+ * @param deliverTask The deliver tasks for dispatching the event.
*/
- public AsyncDeliverTasks(final TaskQueue queue, final TaskQueue handoverQueue,
- final ThreadPool pool)
+ public AsyncDeliverTasks(final ThreadPool pool, final DeliverTask deliverTask)
{
- m_queue = queue;
-
- m_handoverQueue = handoverQueue;
-
m_pool = pool;
+ m_deliver_task = deliverTask;
}
-
+
/**
- * Return a <tt>DeliverTask</tt> that can be used to execute asynchronous event
- * dispatch.
- *
- * @return A task that can be used to execute asynchronous event dispatch
- *
- * @see org.apache.felix.eventadmin.impl.tasks.DeliverTasks#createTask()
- */
- public DeliverTask createTask()
- {
- return this;
- }
-
- /**
- * Execute asynchronous event dispatch.
- *
- * @param tasks The event dispatch tasks to execute
- *
+ * This does not block an unrelated thread used to send a synchronous event.
+ *
+ * @param tasks The event handler dispatch tasks to execute
+ *
* @see org.apache.felix.eventadmin.impl.tasks.DeliverTask#execute(org.apache.felix.eventadmin.impl.tasks.HandlerTask[])
*/
public void execute(final HandlerTask[] tasks)
{
- m_queue.append(tasks);
- }
-
- /**
- * Execute the handover in case of timeout or that the asynchronous event
- * dispatching thread is used to send a synchronous event.
- *
- * @param task The task to set-up in a new thread
- *
- * @see org.apache.felix.eventadmin.impl.tasks.HandoverTask#execute(org.apache.felix.eventadmin.impl.tasks.DispatchTask)
- */
- public void execute(final DispatchTask task)
- {
- // This will spin-off a new thread using the thread pool and set it up with
- // the given task. Additionally, the thread is associated with a callback
- // that will handover (i.e., yet again call this method) and append the
- // tasks given to to the m_handoverQueue (i.e., the synchronous queue). This
- // will happen in case that the current asynchronous thread is used to
- // send a synchronous event.
- m_pool.execute(task, new DeliverTask()
+ final Thread currentThread = Thread.currentThread();
+ TaskExecuter executer = null;
+ synchronized (m_running_threads )
{
- public void execute(final HandlerTask[] managers)
+ TaskExecuter runningExecutor = (TaskExecuter)m_running_threads.get(currentThread);
+ if ( runningExecutor != null )
{
- final BlockTask waitManager = new BlockTask();
-
- final HandlerTask[] newmanagers = new HandlerTask[managers.length + 1];
-
- System.arraycopy(managers, 0, newmanagers, 0,
- managers.length);
-
- newmanagers[managers.length] = waitManager;
-
- m_handoverQueue.append(newmanagers);
-
- task.handover();
-
- waitManager.block();
+ runningExecutor.add(tasks);
}
- });
+ else
+ {
+ executer = new TaskExecuter( tasks, currentThread );
+ m_running_threads.put(currentThread, executer);
+ }
+ }
+ if ( executer != null )
+ {
+ m_pool.executeTask(executer);
+ }
+ }
+
+ private final class TaskExecuter implements Runnable
+ {
+ private final List m_tasks = new LinkedList();
+
+ private final Object m_key;
+
+ public TaskExecuter(final HandlerTask[] tasks, final Object key)
+ {
+ m_key = key;
+ m_tasks.add(tasks);
+ }
+
+ public void run()
+ {
+ boolean running;
+ do
+ {
+ HandlerTask[] tasks = null;
+ synchronized ( m_tasks )
+ {
+ tasks = (HandlerTask[]) m_tasks.remove(0);
+ }
+ m_deliver_task.execute(tasks);
+ synchronized ( m_running_threads )
+ {
+ running = m_tasks.size() > 0;
+ if ( !running )
+ {
+ m_running_threads.remove(m_key);
+ }
+ }
+ } while ( running );
+ }
+
+ public void add(final HandlerTask[] tasks)
+ {
+ synchronized ( m_tasks )
+ {
+ m_tasks.add(tasks);
+ }
+ }
}
}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlockTask.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlockTask.java
deleted file mode 100644
index 0c4f16a..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlockTask.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.tasks;
-
-/**
- * This task will can be used to block a thread that subsequently will be unblocked
- * once the task is executed.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public class BlockTask implements HandlerTask
-{
- // The internal lock for this object used instead synchronized(this)
- private final Object m_lock = new Object();
-
- // Has this task not been executed?
- private boolean m_blocking = true;
-
- /**
- * Unblock possibly blocking threads.
- *
- * @see org.apache.felix.eventadmin.impl.tasks.HandlerTask#execute()
- */
- public void execute()
- {
- synchronized (m_lock)
- {
- m_blocking = false;
- m_lock.notifyAll();
- }
- }
-
- /**
- * This methods does nothing since we only need this task to block and unblock
- * threads.
- *
- * @see org.apache.felix.eventadmin.impl.tasks.HandlerTask#blackListHandler()
- */
- public void blackListHandler()
- {
- // This method does nothing since we only need this task to block and
- // unblock threads
- }
-
- /**
- * Block the calling thread until this task is executed.
- */
- public void block()
- {
- synchronized (m_lock)
- {
- while (m_blocking)
- {
- try
- {
- m_lock.wait();
- } catch (InterruptedException e)
- {
-
- }
- }
- }
- }
-
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/DeliverTasks.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/DeliverTasks.java
deleted file mode 100644
index b9fa92c..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/DeliverTasks.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.tasks;
-
-/**
- * A factory that creates <tt>DeliverTask</tt> objects.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public interface DeliverTasks
-{
- /**
- * Create a deliver task.
- *
- * @return A <tt>DeliverTask</tt> that can be used to dispatch event handler
- * tasks
- */
- public DeliverTask createTask();
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/DispatchTask.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/DispatchTask.java
deleted file mode 100644
index 0197d03..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/DispatchTask.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.tasks;
-
-import org.apache.felix.eventadmin.impl.dispatch.Scheduler;
-import org.apache.felix.eventadmin.impl.dispatch.TaskProducer;
-
-/**
- * This class is the core of the event dispatching (for both, synchronous and
- * asynchronous). It implements handover and timeout capabilities.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public class DispatchTask implements Runnable
-{
- // A null scheduler object that does not schedule given tasks
- private static final Scheduler NULL_SCHEDULER = new Scheduler()
- {
- /**
- * This is a null object and will do nothing with the given task
- *
- * @param task A task that is not used
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.Scheduler#schedule(java.lang.Runnable)
- */
- public void schedule(final Runnable task)
- {
- // This is a null object and will do nothing with the given task
- }
-
- /**
- * This is a null object and will do nothing with the given task
- *
- * @param task A task that is not used
- * @parma nice A value that is not used
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.Scheduler#schedule(java.lang.Runnable, int)
- */
- public void schedule(final Runnable task, final int nice)
- {
- // This is a null object and will do nothing with the given task
- }
- };
-
- // A null producer object that will return null on any call to next()
- private static final TaskProducer NULL_PRODUCER = new TaskProducer()
- {
- /**
- * This is a null object and will return <tt>null</tt>
- *
- * @return <tt>null</tt>
- *
- * @see org.apache.felix.eventadmin.impl.dispatch.TaskProducer#next()
- */
- public HandlerTask next()
- {
- return null;
- }
- };
-
- // A null handover task that will do nothing on execute
- private static final HandoverTask NULL_HANDOVER = new HandoverTask()
- {
- /**
- * This is a null object that will do nothing.
- *
- * @parma task A task that is not used
- *
- * @see org.apache.felix.eventadmin.impl.tasks.HandoverTask#execute(org.apache.felix.eventadmin.impl.tasks.DispatchTask)
- */
- public void execute(final DispatchTask task)
- {
- // This is a null object that will do nothing.
- }
- };
-
- // The internal lock for this object used instead synchronized(this)
- final Object m_lock = new Object();
-
- // The task producer (i.e., the event queue) that will be a null object if not
- // needed anymore
- private volatile TaskProducer m_producer;
-
- // The scheduler to use that will be a null object if not needed anymore
- private Scheduler m_scheduler;
-
- // The handover callback that is called on timeouts and handovers and that will
- // be a null object if not needed anymore
- private HandoverTask m_handover;
-
- // Used to blacklist on timeout
- private BlackListTask m_blackListTask = null;
-
- // Are we currently blocked (i.e., do not tick the timeout clock down)?
- private boolean m_isHolding = false;
-
- /**
- * The constructor of the object.
- *
- * @param producer The producer (i.e., the event queue) that provides the next
- * tasks
- * @param scheduler The scheduler to use for timeout actions
- * @param handover The callback to use on timeouts and handovers
- */
- public DispatchTask(final TaskProducer producer, final Scheduler scheduler,
- final HandoverTask handover)
- {
- m_producer = producer;
-
- m_scheduler = scheduler;
-
- m_handover = handover;
- }
-
- /*
- * Construct a new object from a old one.
- */
- private DispatchTask(final DispatchTask old)
- {
- this(old.m_producer, old.m_scheduler, old.m_handover);
- }
-
- /**
- * This will loop until the producer returns <tt>null</tt>. Until then the
- * returned tasks are executed.
- *
- * @see java.lang.Runnable#run()
- */
- public void run()
- {
- for (HandlerTask manager = m_producer.next(); null != manager; manager = m_producer
- .next())
- {
- synchronized (m_lock)
- {
- // Set-up the timeout
- m_blackListTask = new BlackListTask(manager);
-
- m_scheduler.schedule(m_blackListTask);
- }
-
- // HandlerTask does catch exceptions hence, we don't need to do it.
- manager.execute();
-
- synchronized (m_lock)
- {
- // release the timeout
- m_blackListTask.cancel();
- }
- }
- }
-
- /**
- * This method will trigger a callback to the handover callback and stop this
- * task.
- */
- public void handover()
- {
- synchronized (m_lock)
- {
- // release the timeout
- m_blackListTask.cancel();
-
- // spin-off a new thread
- m_handover.execute(new DispatchTask(this));
-
- stop();
- }
- }
-
- /**
- * This method stops the tasks without a handover
- */
- public void stop()
- {
- synchronized (m_lock)
- {
- // release the timeout
- m_blackListTask.cancel();
-
- m_handover = NULL_HANDOVER;
-
- m_producer = NULL_PRODUCER;
-
- m_scheduler = NULL_SCHEDULER;
- }
- }
-
- /**
- * This will pause the task (including its timeout clock) until a call to
- * <tt>resume()</tt>
- */
- public void hold()
- {
- synchronized (m_lock)
- {
- // release the timeout
- m_blackListTask.cancel();
-
- // record the time that we already used
- int pastTime = (int) (System.currentTimeMillis() - m_blackListTask
- .getTime());
-
- // spin-off a new thread
- m_handover.execute(new DispatchTask(this));
-
- // block until a call to resume()
- m_isHolding = true;
-
- while (m_isHolding)
- {
- try
- {
- m_lock.wait();
- } catch (InterruptedException e)
- {
- }
- }
-
- // restore the timeout
- m_blackListTask = new BlackListTask(m_blackListTask,
- System.currentTimeMillis() - pastTime);
-
- m_scheduler.schedule(m_blackListTask, pastTime);
- }
- }
-
- /**
- * This will let the previously hold task resume.
- */
- public void resume()
- {
- synchronized (m_lock)
- {
- m_isHolding = false;
-
- m_lock.notifyAll();
- }
- }
-
- /*
- * This is the implementation of the timeout.
- */
- private class BlackListTask implements Runnable
- {
- // Are we canceled?
- private boolean m_canceled = false;
-
- // The time we have been started
- private final long m_time;
-
- // The task we will blacklist if we are triggered
- private final HandlerTask m_manager;
-
- BlackListTask(final HandlerTask manager)
- {
- this(manager, System.currentTimeMillis());
- }
-
- BlackListTask(final HandlerTask manager, final long time)
- {
- m_manager = manager;
-
- m_time = time;
- }
-
- BlackListTask(final BlackListTask old, final long time)
- {
- this(old.m_manager, time);
- }
-
- /**
- * @return The time we have been created.
- */
- public long getTime()
- {
- return m_time;
- }
-
- /**
- * We have been triggered hence, blacklist the handler except if we are
- * already canceled
- *
- * @see java.lang.Runnable#run()
- */
- public void run()
- {
- synchronized (m_lock)
- {
- if (!m_canceled)
- {
- m_manager.blackListHandler();
-
- handover();
- }
- }
- }
-
- /**
- * Cancel the timeout
- */
- public void cancel()
- {
- synchronized (m_lock)
- {
- m_canceled = true;
- }
- }
- }
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java
index c099d4e..33289a0 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,7 +21,7 @@
/**
* A task that will deliver its event to its <tt>EventHandler</tt> when executed
* or blacklist the handler, respectively.
- *
+ *
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
public interface HandlerTask
@@ -29,10 +29,15 @@
/**
* Deliver the event to the handler.
*/
- public void execute();
-
+ void execute();
+
/**
* Blacklist the handler.
*/
- public void blackListHandler();
+ void blackListHandler();
+
+ /**
+ * Is the delivery finished
+ */
+ boolean finished();
}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTaskImpl.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTaskImpl.java
index c5b3001..213620e 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTaskImpl.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTaskImpl.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -26,7 +26,7 @@
/**
* An implementation of the <tt>HandlerTask</tt> interface.
- *
+ *
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
public class HandlerTaskImpl implements HandlerTask
@@ -40,13 +40,16 @@
// Used to blacklist the service or get the service object for the reference
private final BlacklistingHandlerTasks m_handlerTasks;
+ // Is this handler finished
+ private volatile boolean m_finished = false;
+
/**
* Construct a delivery task for the given service and event.
- *
+ *
* @param eventHandlerRef The servicereference of the handler
* @param event The event to deliver
* @param handlerTasks Used to blacklist the service or get the service object
- * for the reference
+ * for the reference
*/
public HandlerTaskImpl(final ServiceReference eventHandlerRef,
final Event event, final BlacklistingHandlerTasks handlerTasks)
@@ -63,7 +66,7 @@
*/
public void execute()
{
- // Get the service object
+ // Get the service object
final EventHandler handler = m_handlerTasks
.getEventHandler(m_eventHandlerRef);
@@ -80,7 +83,7 @@
+ m_eventHandlerRef + " | Bundle("
+ m_eventHandlerRef.getBundle() + ")]", e);
}
-
+ m_finished = true;
m_handlerTasks.ungetEventHandler(handler, m_eventHandlerRef);
}
@@ -91,4 +94,13 @@
{
m_handlerTasks.blackList(m_eventHandlerRef);
}
+
+ /**
+ * @see org.apache.felix.eventadmin.impl.tasks.HandlerTask#finished()
+ */
+ public boolean finished()
+ {
+ return m_finished;
+ }
+
}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandoverTask.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandoverTask.java
deleted file mode 100644
index c84e6f1..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandoverTask.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.tasks;
-
-/**
- * A task that is used to handover a dispatch thread context to another thread.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public interface HandoverTask
-{
- /**
- * Handover the context to another thread.
- *
- * @param task The context to be executed in another thread.
- */
- public void execute(final DispatchTask task);
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/Rendezvous.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/Rendezvous.java
new file mode 100644
index 0000000..8079a2f
--- /dev/null
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/Rendezvous.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.eventadmin.impl.tasks;
+
+import EDU.oswego.cs.dl.util.concurrent.*;
+
+/**
+ * This is a simplified version of the CyclicBarrier implementation.
+ * It provides the same methods but internally ignores the exceptions.
+ *
+ * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
+ */
+public class Rendezvous extends CyclicBarrier
+{
+ /** Flag for timedout handling. */
+ private volatile boolean timedout = false;
+
+ /**
+ * Create a Barrier for the indicated number of parties, and the default
+ * Rotator function to run at each barrier point.
+ */
+ public Rendezvous()
+ {
+ super(2);
+ }
+
+ /**
+ * see {@link CyclicBarrier#barrier()}
+ */
+ public void waitForRendezvous()
+ {
+ if ( timedout )
+ {
+ // if we have timed out, we return immediately
+ return;
+ }
+ try
+ {
+ this.barrier();
+ }
+ catch (BrokenBarrierException ignore1)
+ {
+ }
+ catch (InterruptedException ignore2)
+ {
+ }
+ }
+
+ /**
+ * see {@link CyclicBarrier#attemptBarrier(long)}
+ */
+ public void waitAttemptForRendezvous(final long timeout)
+ throws TimeoutException
+ {
+ try
+ {
+ this.attemptBarrier(timeout);
+ this.restart();
+ }
+ catch (BrokenBarrierException ignore1)
+ {
+ }
+ catch (TimeoutException te)
+ {
+ timedout = true;
+ throw te;
+ }
+ catch (InterruptedException ignore2)
+ {
+ }
+ }
+
+ public boolean isTimedOut()
+ {
+ return timedout;
+ }
+}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/ResumeTask.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/ResumeTask.java
deleted file mode 100644
index 5dfa74b..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/ResumeTask.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.tasks;
-
-import org.apache.felix.eventadmin.impl.dispatch.ThreadPool;
-
-/**
- * A task that wakes-up a disabled <tt>DispatchTask</tt>. Additionally, it will
- * stop the currently running task.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public class ResumeTask implements HandlerTask
-{
- // The task to wake-up on execution
- private final DispatchTask m_target;
-
- // The pool used to get the task to stop on execution
- private final ThreadPool m_pool;
-
- /**
- * @param target The task to wake-up on execution
- * @param pool The pool used to get the task to stop on execution
- */
- public ResumeTask(final DispatchTask target, final ThreadPool pool)
- {
- m_target = target;
-
- m_pool = pool;
- }
-
- /**
- * Stop the current task and wake-up the target.
- *
- * @see org.apache.felix.eventadmin.impl.tasks.HandlerTask#execute()
- */
- public void execute()
- {
- m_pool.getTask(Thread.currentThread(), null).stop();
-
- m_target.resume();
- }
-
- /**
- * This does nothing since this task is only used to wake-up disabled tasks.
- *
- * @see org.apache.felix.eventadmin.impl.tasks.HandlerTask#blackListHandler()
- */
- public void blackListHandler()
- {
- }
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
index 4751ca4..d9e5f4c 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,124 +18,170 @@
*/
package org.apache.felix.eventadmin.impl.tasks;
-import org.apache.felix.eventadmin.impl.dispatch.TaskQueue;
import org.apache.felix.eventadmin.impl.dispatch.ThreadPool;
+import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
+
/**
* This class does the actual work of the synchronous event delivery.
+ *
+ * This is the heart of the event delivery. If an event is delivered
+ * without timeout handling, the event is directly delivered using
+ * the calling thread.
+ * If timeout handling is enabled, a new thread is taken from the
+ * thread pool and this thread is used to deliver the event.
+ * The calling thread is blocked until either the deliver is finished
+ * or the timeout occurs.
* <p><tt>
- * It serves two purposes, first it is used to select the appropriate action
- * depending on whether the sending thread is the asynchronous, the synchronous, or
- * an unrelated thread. Second, it will set up a given dispatch
- * task with its <tt>ThreadPool</tt> in a way that it is associated with a
- * <tt>DeliverTask</tt> that will push given handler tasks to the queue and
- * then wait for the tasks to be completed.
- * </tt></p>
- * In other words if an unrelated thread is used to send a synchronous event it is
- * blocked until the event is send (or a timeout occurs), if an asynchronous thread
- * is used its handover callback is called in order to spin-off a new asynchronous
- * delivery thread and the former is blocked until the events are delivered and then
- * released (or returned to its thread pool), if a synchronous thread is used its
- * task is disabled, the events are pushed to the queue and the threads continuous
- * with the delivery of the new events (as per spec). Once the new events are done
- * the thread wakes-up the disabled task and resumes to execute it.
- * <p><tt>
- * Note that in case of a timeout while a task is disabled the thread is released and
- * we spin-off a new thread that resumes the disabled task hence, this is the only
- * place were we break the semantics of the synchronous delivery. While the only one
- * to notice this is the timed-out handler - it is the fault of this handler too
- * (i.e., it blocked the dispatch for to long) but since it will not receive events
- * anymore it will not notice this semantic difference except that it might not see
- * events it already sent before.
+ * Note that in case of a timeout while a task is disabled the thread
+ * is released and we spin-off a new thread that resumes the disabled
+ * task hence, this is the only place were we break the semantics of
+ * the synchronous delivery. While the only one to notice this is the
+ * timed-out handler - it is the fault of this handler too (i.e., it
+ * blocked the dispatch for to long) but since it will not receive
+ * events anymore it will not notice this semantic difference except
+ * that it might not see events it already sent before.
* </tt></pre>
+ *
+ * If during an event delivery a new event should be delivered from
+ * within the event handler, the timeout handler is stopped for the
+ * delivery time of the inner event!
+ *
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
-public class SyncDeliverTasks implements DeliverTasks, HandoverTask, DeliverTask
+public class SyncDeliverTasks implements DeliverTask
{
- // The synchronous event queue
- final TaskQueue m_queue;
-
- // The thread pool used to spin-off new threads and associate callbacks with
- // tasks
+ /** The thread pool used to spin-off new threads. */
final ThreadPool m_pool;
- /**
- * @param queue The synchronous event queue
- * @param pool The thread pool used to spin-off new threads and associate
- * callbacks with tasks
- */
- public SyncDeliverTasks(final TaskQueue queue, final ThreadPool pool)
- {
- m_queue = queue;
+ /** The timeout for event handlers, 0 = disabled. */
+ final long m_timeout;
+ /**
+ * Construct a new sync deliver tasks.
+ * @param pool The thread pool used to spin-off new threads.
+ * @param timeout The timeout for an event handler, 0 = disabled
+ */
+ public SyncDeliverTasks(final ThreadPool pool, final long timeout)
+ {
m_pool = pool;
+ m_timeout = timeout;
}
/**
- * This will select the appropriate action depending on whether the sending
- * thread is the asynchronous, the synchronous, or an unrelated thread.
- *
- * @return The appropriate action
- *
- * @see org.apache.felix.eventadmin.impl.tasks.DeliverTasks#createTask()
- */
- public DeliverTask createTask()
- {
- return m_pool.getCallback(Thread.currentThread(), this);
- }
-
- /**
- * This blocks an unrelated thread used to send a synchronous event until the
- * event is send (or a timeout occurs).
- *
+ * This method defines if a timeout handling should be used for the
+ * task.
* @param tasks The event handler dispatch tasks to execute
- *
- * @see org.apache.felix.eventadmin.impl.tasks.DeliverTask#execute(org.apache.felix.eventadmin.impl.tasks.HandlerTask[])
+ */
+ private boolean useTimeout(final HandlerTask task)
+ {
+ return m_timeout > 0;
+ }
+
+ /**
+ * This blocks an unrelated thread used to send a synchronous event until the
+ * event is send (or a timeout occurs).
+ *
+ * @param tasks The event handler dispatch tasks to execute
+ *
+ * @see org.apache.felix.eventadmin.impl.tasks.DeliverTask#execute(HandlerTask[])
*/
public void execute(final HandlerTask[] tasks)
{
- final BlockTask waitManager = new BlockTask();
-
- final HandlerTask[] newtasks = new HandlerTask[tasks.length + 1];
-
- System.arraycopy(tasks, 0, newtasks, 0, tasks.length);
-
- newtasks[tasks.length] = waitManager;
-
- m_queue.append(newtasks);
-
- waitManager.block();
- }
-
- /**
- * Set up a given dispatch task with its <tt>ThreadPool</tt> in a way that it is
- * associated with a <tt>DeliverTask</tt> that will push given handler tasks to
- * the queue and then wait for the tasks to be completed.
- *
- * @param task The task to set-up
- *
- * @see org.apache.felix.eventadmin.impl.tasks.HandoverTask#execute(org.apache.felix.eventadmin.impl.tasks.DispatchTask)
- */
- public void execute(final DispatchTask task)
- {
- m_pool.execute(task, new DeliverTask()
+ final Thread sleepingThread = Thread.currentThread();
+ SyncThread syncThread = sleepingThread instanceof SyncThread ? (SyncThread)sleepingThread : null;
+ final Rendezvous cascadingBarrier = new Rendezvous();
+ // check if this is a cascaded event sending
+ if ( syncThread != null )
{
- public void execute(final HandlerTask[] managers)
+ // wake up outer thread
+ if ( syncThread.isTopMostHandler() )
{
- final ResumeTask resumeManager = new ResumeTask(
- task, m_pool);
-
- final HandlerTask[] newmanagers = new HandlerTask[managers.length + 1];
-
- System.arraycopy(managers, 0, newmanagers, 0,
- managers.length);
-
- newmanagers[managers.length] = resumeManager;
-
- m_queue.push(newmanagers);
-
- task.hold();
+ syncThread.getTimerBarrier().waitForRendezvous();
}
- });
+ syncThread.innerEventHandlingStart();
+ }
+
+ for(int i=0;i<tasks.length;i++)
+ {
+ final HandlerTask task = tasks[i];
+
+ if ( !useTimeout(task) )
+ {
+ // no timeout, we can directly execute
+ task.execute();
+ }
+ else
+ {
+ final Rendezvous startBarrier = new Rendezvous();
+ final Rendezvous timerBarrier = new Rendezvous();
+ m_pool.executeTask(new Runnable()
+ {
+ public void run()
+ {
+ final SyncThread myThread = (SyncThread)Thread.currentThread();
+ myThread.init(timerBarrier, cascadingBarrier);
+ try
+ {
+ // notify the outer thread to start the timer
+ startBarrier.waitForRendezvous();
+ // execute the task
+ task.execute();
+ // stop the timer
+ timerBarrier.waitForRendezvous();
+ }
+ finally
+ {
+ myThread.cleanup();
+ }
+ }
+ });
+ // we wait for the inner thread to start
+ startBarrier.waitForRendezvous();
+
+ // timeout handling
+ boolean finished;
+ long sleepTime = m_timeout;
+ do {
+ finished = true;
+ // we sleep for the sleep time
+ // if someone wakes us up it's the inner task who either
+ // has finished or a cascading event
+ long startTime = System.currentTimeMillis();
+ try
+ {
+ timerBarrier.waitAttemptForRendezvous(sleepTime);
+ // if this occurs no timeout occured or we have a cascaded event
+ if ( !task.finished() )
+ {
+ // adjust remaining sleep time
+ sleepTime = m_timeout - (System.currentTimeMillis() - startTime);
+ cascadingBarrier.waitForRendezvous();
+ finished = task.finished();
+ }
+ }
+ catch (TimeoutException ie)
+ {
+ // if we timed out, we have to blacklist the handler
+ task.blackListHandler();
+ }
+ }
+ while ( !finished );
+
+ }
+ }
+ // wake up outer thread again if cascaded
+
+ if ( syncThread != null )
+ {
+ syncThread.innerEventHandlingStopped();
+ if ( syncThread.isTopMostHandler() )
+ {
+ if ( !syncThread.getTimerBarrier().isTimedOut() ) {
+ syncThread.getCascadingBarrier().waitForRendezvous();
+ }
+ }
+ }
+
}
}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncThread.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncThread.java
new file mode 100644
index 0000000..bb174cc
--- /dev/null
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncThread.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.eventadmin.impl.tasks;
+
+/**
+ * This thread class is used for sending the events
+ * synchronously.
+ * It handles cascaded synchronous events.
+ *
+ * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
+ */
+public class SyncThread extends Thread
+{
+
+ /** Counter to track the nesting level. */
+ private volatile int counter;
+
+ /** The barriers for synchronizing. */
+ private volatile Rendezvous timerBarrier;
+ private volatile Rendezvous cascadingBarrier;
+
+ /**
+ * Constructor used by the thread pool.
+ */
+ public SyncThread(Runnable target)
+ {
+ super(target);
+ }
+
+ public void init(final Rendezvous timerBarrier, final Rendezvous cascadingBarrier)
+ {
+ this.timerBarrier = timerBarrier;
+ this.cascadingBarrier = cascadingBarrier;
+ }
+
+ public void cleanup()
+ {
+ this.timerBarrier = null;
+ this.cascadingBarrier = null;
+ }
+
+ public Rendezvous getTimerBarrier()
+ {
+ return timerBarrier;
+ }
+
+ public Rendezvous getCascadingBarrier()
+ {
+ return cascadingBarrier;
+ }
+
+ public boolean isTopMostHandler()
+ {
+ return counter == 0;
+ }
+
+ public void innerEventHandlingStart()
+ {
+ counter++;
+ }
+
+ public void innerEventHandlingStopped()
+ {
+ counter--;
+ }
+}