FELIX-3518 : Implement EventAdmin 1.3 (WiP)
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1341782 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java
index c714afc..3b33bde 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java
@@ -18,7 +18,9 @@
*/
package org.apache.felix.eventadmin.impl.handler;
-import org.apache.felix.eventadmin.impl.tasks.*;
+import org.apache.felix.eventadmin.impl.tasks.AsyncDeliverTasks;
+import org.apache.felix.eventadmin.impl.tasks.DefaultThreadPool;
+import org.apache.felix.eventadmin.impl.tasks.SyncDeliverTasks;
import org.osgi.framework.BundleContext;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -54,12 +56,12 @@
* @param asyncPool The asynchronous thread pool
*/
public EventAdminImpl(
- final BundleContext bundleContext,
- final DefaultThreadPool syncPool,
- final DefaultThreadPool asyncPool,
- final int timeout,
- final String[] ignoreTimeout,
- final boolean requireTopic)
+ final BundleContext bundleContext,
+ final DefaultThreadPool syncPool,
+ final DefaultThreadPool asyncPool,
+ final int timeout,
+ final String[] ignoreTimeout,
+ final boolean requireTopic)
{
checkNull(syncPool, "syncPool");
checkNull(asyncPool, "asyncPool");
@@ -109,7 +111,7 @@
*/
public void sendEvent(final Event event)
{
- m_sendManager.execute(this.getTracker().getHandlers(event), event);
+ m_sendManager.execute(this.getTracker().getHandlers(event), event, false);
}
/**
@@ -125,8 +127,8 @@
* Update the event admin with new configuration.
*/
public void update(final int timeout,
- final String[] ignoreTimeout,
- final boolean requireTopic)
+ final String[] ignoreTimeout,
+ final boolean requireTopic)
{
this.tracker.close();
this.tracker.update(ignoreTimeout, requireTopic);
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
index 40a4a69..ac0db05 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
@@ -18,10 +18,17 @@
*/
package org.apache.felix.eventadmin.impl.handler;
+import java.util.Collection;
+
import org.apache.felix.eventadmin.impl.security.PermissionsUtil;
import org.apache.felix.eventadmin.impl.util.LogWrapper;
-import org.osgi.framework.*;
-import org.osgi.service.event.*;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.Filter;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
/**
* This is a proxy for event handlers. It gets the real event handler
@@ -55,28 +62,31 @@
/** Use timeout. */
private boolean useTimeout;
- /**
- * Create an EventHandlerProxy.
+ /** Deliver async ordered. */
+ private boolean asyncOrderedDelivery;
+
+ /**
+ * Create an EventHandlerProxy.
*
* @param context The handler context
- * @param reference Reference to the EventHandler
- */
- public EventHandlerProxy(final EventHandlerTracker.HandlerContext context,
- final ServiceReference reference)
- {
- this.handlerContext = context;
- this.reference = reference;
- }
+ * @param reference Reference to the EventHandler
+ */
+ public EventHandlerProxy(final EventHandlerTracker.HandlerContext context,
+ final ServiceReference reference)
+ {
+ this.handlerContext = context;
+ this.reference = reference;
+ }
- /**
- * Update the state with current properties from the service
- * @return <code>true</code> if the handler configuration is valid.
- */
- public boolean update()
- {
- this.blacklisted = false;
- boolean valid = true;
- // First check, topic
+ /**
+ * Update the state with current properties from the service
+ * @return <code>true</code> if the handler configuration is valid.
+ */
+ public boolean update()
+ {
+ this.blacklisted = false;
+ boolean valid = true;
+ // First check, topic
final Object topicObj = reference.getProperty(EventConstants.EVENT_TOPIC);
if (topicObj instanceof String)
{
@@ -126,11 +136,11 @@
reason = "Neither of type String nor String[] : " + topicObj.getClass().getName();
}
LogWrapper.getLogger().log(
- this.reference,
- LogWrapper.LOG_WARNING,
- "Invalid EVENT_TOPICS : " + reason + " - Ignoring ServiceReference ["
- + this.reference + " | Bundle("
- + this.reference.getBundle() + ")]");
+ this.reference,
+ LogWrapper.LOG_WARNING,
+ "Invalid EVENT_TOPICS : " + reason + " - Ignoring ServiceReference ["
+ + this.reference + " | Bundle("
+ + this.reference.getBundle() + ")]");
this.topics = null;
valid = false;
}
@@ -149,39 +159,90 @@
{
valid = false;
LogWrapper.getLogger().log(
- this.reference,
- LogWrapper.LOG_WARNING,
- "Invalid EVENT_FILTER - Ignoring ServiceReference ["
- + this.reference + " | Bundle("
- + this.reference.getBundle() + ")]", e);
+ this.reference,
+ LogWrapper.LOG_WARNING,
+ "Invalid EVENT_FILTER - Ignoring ServiceReference ["
+ + this.reference + " | Bundle("
+ + this.reference.getBundle() + ")]", e);
}
}
else if ( filterObj != null )
{
valid = false;
LogWrapper.getLogger().log(
- this.reference,
- LogWrapper.LOG_WARNING,
- "Invalid EVENT_FILTER - Ignoring ServiceReference ["
- + this.reference + " | Bundle("
- + this.reference.getBundle() + ")]");
+ this.reference,
+ LogWrapper.LOG_WARNING,
+ "Invalid EVENT_FILTER - Ignoring ServiceReference ["
+ + this.reference + " | Bundle("
+ + this.reference.getBundle() + ")]");
}
}
this.filter = handlerFilter;
+ // new in 1.3 - deliver
+ this.asyncOrderedDelivery = true;
+ Object delivery = reference.getProperty(EventConstants.EVENT_DELIVERY);
+ if ( delivery instanceof Collection )
+ {
+ delivery = ((Collection)delivery).toArray(new String[((Collection)delivery).size()]);
+ }
+ if ( delivery instanceof String )
+ {
+ this.asyncOrderedDelivery = !(EventConstants.DELIVERY_ASYNC_UNORDERED.equals(delivery.toString()));
+ }
+ else if ( delivery instanceof String[] )
+ {
+ final String[] deliveryArray = (String[])delivery;
+ boolean foundOrdered = false, foundUnordered = false;
+ for(int i=0; i<deliveryArray.length; i++)
+ {
+ final String value = deliveryArray[i];
+ if ( EventConstants.DELIVERY_ASYNC_UNORDERED.equals(value) )
+ {
+ foundUnordered = true;
+ }
+ else if ( EventConstants.DELIVERY_ASYNC_ORDERED.equals(value) )
+ {
+ foundOrdered = true;
+ }
+ else
+ {
+ LogWrapper.getLogger().log(
+ this.reference,
+ LogWrapper.LOG_WARNING,
+ "Invalid EVENT_DELIVERY - Ignoring invalid value for event delivery property " + value + " of ServiceReference ["
+ + this.reference + " | Bundle("
+ + this.reference.getBundle() + ")]");
+
+ }
+ }
+ if ( !foundOrdered && foundUnordered )
+ {
+ this.asyncOrderedDelivery = false;
+ }
+ }
+ else if ( delivery != null )
+ {
+ LogWrapper.getLogger().log(
+ this.reference,
+ LogWrapper.LOG_WARNING,
+ "Invalid EVENT_DELIVERY - Ignoring event delivery property " + delivery + " of ServiceReference ["
+ + this.reference + " | Bundle("
+ + this.reference.getBundle() + ")]");
+ }
// make sure to release the handler
this.release();
return valid;
- }
+ }
- /**
- * Dispose the proxy and release the handler
- */
- public void dispose()
- {
- this.release();
- }
+ /**
+ * Dispose the proxy and release the handler
+ */
+ public void dispose()
+ {
+ this.release();
+ }
/**
* Get the event handler.
@@ -206,12 +267,12 @@
}
/**
- * Release the handler
- */
- private synchronized void release()
- {
- if ( this.handler != null )
- {
+ * Release the handler
+ */
+ private synchronized void release()
+ {
+ if ( this.handler != null )
+ {
try
{
this.handlerContext.bundleContext.ungetService(this.reference);
@@ -221,17 +282,17 @@
// event handler might be stopped - ignore
}
this.handler = null;
- }
- }
+ }
+ }
- /**
- * Get the topics of this handler.
- * If this handler matches all topics <code>null</code> is returned
- */
- public String[] getTopics()
- {
- return this.topics;
- }
+ /**
+ * Get the topics of this handler.
+ * If this handler matches all topics <code>null</code> is returned
+ */
+ public String[] getTopics()
+ {
+ return this.topics;
+ }
/**
* Check if this handler is allowed to receive the event
@@ -278,6 +339,14 @@
}
/**
+ * Should async events be delivered in order?
+ */
+ public boolean isAsyncOrderedDelivery()
+ {
+ return this.asyncOrderedDelivery;
+ }
+
+ /**
* Check the timeout configuration for this handler.
*/
private void checkTimeout(final String className)
@@ -300,43 +369,43 @@
}
/**
- * Send the event.
- */
- public void sendEvent(final Event event)
- {
- final EventHandler handlerService = this.obtain();
- if (handlerService == null)
- {
- return;
- }
+ * Send the event.
+ */
+ public void sendEvent(final Event event)
+ {
+ final EventHandler handlerService = this.obtain();
+ if (handlerService == null)
+ {
+ return;
+ }
- try
- {
- handlerService.handleEvent(event);
- }
- catch (final Throwable e)
- {
+ try
+ {
+ handlerService.handleEvent(event);
+ }
+ catch (final Throwable e)
+ {
// The spec says that we must catch exceptions and log them:
LogWrapper.getLogger().log(
- this.reference,
- LogWrapper.LOG_WARNING,
- "Exception during event dispatch [" + event + " | "
- + this.reference + " | Bundle("
- + this.reference.getBundle() + ")]", e);
- }
- }
+ this.reference,
+ LogWrapper.LOG_WARNING,
+ "Exception during event dispatch [" + event + " | "
+ + this.reference + " | Bundle("
+ + this.reference.getBundle() + ")]", e);
+ }
+ }
- /**
- * Blacklist the handler.
- */
- public void blackListHandler()
- {
+ /**
+ * Blacklist the handler.
+ */
+ public void blackListHandler()
+ {
LogWrapper.getLogger().log(
- LogWrapper.LOG_WARNING,
- "Blacklisting ServiceReference [" + this.reference + " | Bundle("
- + this.reference.getBundle() + ")] due to timeout!");
+ LogWrapper.LOG_WARNING,
+ "Blacklisting ServiceReference [" + this.reference + " | Bundle("
+ + this.reference.getBundle() + ")] due to timeout!");
this.blacklisted = true;
// we can free the handler now.
this.release();
- }
+ }
}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java
index e3653f8..ecd9f3d 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java
@@ -18,8 +18,14 @@
*/
package org.apache.felix.eventadmin.impl.tasks;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.felix.eventadmin.impl.handler.EventHandlerProxy;
import org.osgi.service.event.Event;
/**
@@ -63,24 +69,42 @@
*/
public void execute(final Collection tasks, final Event event)
{
- final Thread currentThread = Thread.currentThread();
- TaskExecuter executer = null;
- synchronized (m_running_threads )
+ final Iterator i = tasks.iterator();
+ boolean hasOrdered = false;
+ while ( i.hasNext() )
{
- TaskExecuter runningExecutor = (TaskExecuter)m_running_threads.get(currentThread);
- if ( runningExecutor != null )
+ final EventHandlerProxy task = (EventHandlerProxy)i.next();
+ if ( !task.isAsyncOrderedDelivery() )
{
- runningExecutor.add(tasks, event);
+ // do somethimg
}
else
{
- executer = new TaskExecuter( tasks, event, currentThread );
- m_running_threads.put(currentThread, executer);
+ hasOrdered = true;
}
+
}
- if ( executer != null )
+ if ( hasOrdered )
{
- m_pool.executeTask(executer);
+ final Thread currentThread = Thread.currentThread();
+ TaskExecuter executer = null;
+ synchronized (m_running_threads )
+ {
+ final TaskExecuter runningExecutor = (TaskExecuter)m_running_threads.get(currentThread);
+ if ( runningExecutor != null )
+ {
+ runningExecutor.add(tasks, event);
+ }
+ else
+ {
+ executer = new TaskExecuter( tasks, event, currentThread );
+ m_running_threads.put(currentThread, executer);
+ }
+ }
+ if ( executer != null )
+ {
+ m_pool.executeTask(executer);
+ }
}
}
@@ -106,7 +130,7 @@
{
tasks = (Object[]) m_tasks.remove(0);
}
- m_deliver_task.execute((Collection)tasks[0], (Event)tasks[1]);
+ m_deliver_task.execute((Collection)tasks[0], (Event)tasks[1], true);
synchronized ( m_running_threads )
{
running = m_tasks.size() > 0;
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
index a06f2c8..59c702c 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
@@ -101,7 +101,7 @@
* @param tasks The event handler dispatch tasks to execute
*
*/
- public void execute(final Collection tasks, final Event event)
+ public void execute(final Collection tasks, final Event event, final boolean filterAsyncUnordered)
{
final Thread sleepingThread = Thread.currentThread();
final SyncThread syncThread = sleepingThread instanceof SyncThread ? (SyncThread)sleepingThread : null;
@@ -110,62 +110,64 @@
while ( i.hasNext() )
{
final EventHandlerProxy task = (EventHandlerProxy)i.next();
-
- if ( !useTimeout(task) )
+ if ( !filterAsyncUnordered || task.isAsyncOrderedDelivery() )
{
- // no timeout, we can directly execute
- task.sendEvent(event);
- }
- else if ( syncThread != null )
- {
- // if this is a cascaded event, we directly use this thread
- // otherwise we could end up in a starvation
- final long startTime = System.currentTimeMillis();
- task.sendEvent(event);
- if ( System.currentTimeMillis() - startTime > this.timeout )
+ if ( !useTimeout(task) )
{
- task.blackListHandler();
+ // no timeout, we can directly execute
+ task.sendEvent(event);
}
- }
- else
- {
- final Rendezvous startBarrier = new Rendezvous();
- final Rendezvous timerBarrier = new Rendezvous();
- this.pool.executeTask(new Runnable()
+ else if ( syncThread != null )
{
- public void run()
+ // if this is a cascaded event, we directly use this thread
+ // otherwise we could end up in a starvation
+ final long startTime = System.currentTimeMillis();
+ task.sendEvent(event);
+ if ( System.currentTimeMillis() - startTime > this.timeout )
{
- try
- {
- // notify the outer thread to start the timer
- startBarrier.waitForRendezvous();
- // execute the task
- task.sendEvent(event);
- // stop the timer
- timerBarrier.waitForRendezvous();
- }
- catch (final IllegalStateException ise)
- {
- // this can happen on shutdown, so we ignore it
- }
+ task.blackListHandler();
}
- });
- // we wait for the inner thread to start
- startBarrier.waitForRendezvous();
-
- // timeout handling
- // we sleep for the sleep time
- // if someone wakes us up it's the finished inner task
- try
- {
- timerBarrier.waitAttemptForRendezvous(this.timeout);
}
- catch (final TimeoutException ie)
+ else
{
- // if we timed out, we have to blacklist the handler
- task.blackListHandler();
- }
+ final Rendezvous startBarrier = new Rendezvous();
+ final Rendezvous timerBarrier = new Rendezvous();
+ this.pool.executeTask(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // notify the outer thread to start the timer
+ startBarrier.waitForRendezvous();
+ // execute the task
+ task.sendEvent(event);
+ // stop the timer
+ timerBarrier.waitForRendezvous();
+ }
+ catch (final IllegalStateException ise)
+ {
+ // this can happen on shutdown, so we ignore it
+ }
+ }
+ });
+ // we wait for the inner thread to start
+ startBarrier.waitForRendezvous();
+ // timeout handling
+ // we sleep for the sleep time
+ // if someone wakes us up it's the finished inner task
+ try
+ {
+ timerBarrier.waitAttemptForRendezvous(this.timeout);
+ }
+ catch (final TimeoutException ie)
+ {
+ // if we timed out, we have to blacklist the handler
+ task.blackListHandler();
+ }
+
+ }
}
}
}