FELIX-3518 : Implement EventAdmin 1.3 (WiP)

git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1341782 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java
index c714afc..3b33bde 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.felix.eventadmin.impl.handler;
 
-import org.apache.felix.eventadmin.impl.tasks.*;
+import org.apache.felix.eventadmin.impl.tasks.AsyncDeliverTasks;
+import org.apache.felix.eventadmin.impl.tasks.DefaultThreadPool;
+import org.apache.felix.eventadmin.impl.tasks.SyncDeliverTasks;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -54,12 +56,12 @@
      * @param asyncPool The asynchronous thread pool
      */
     public EventAdminImpl(
-            final BundleContext bundleContext,
-            final DefaultThreadPool syncPool,
-            final DefaultThreadPool asyncPool,
-            final int timeout,
-            final String[] ignoreTimeout,
-            final boolean requireTopic)
+                    final BundleContext bundleContext,
+                    final DefaultThreadPool syncPool,
+                    final DefaultThreadPool asyncPool,
+                    final int timeout,
+                    final String[] ignoreTimeout,
+                    final boolean requireTopic)
     {
         checkNull(syncPool, "syncPool");
         checkNull(asyncPool, "asyncPool");
@@ -109,7 +111,7 @@
      */
     public void sendEvent(final Event event)
     {
-        m_sendManager.execute(this.getTracker().getHandlers(event), event);
+        m_sendManager.execute(this.getTracker().getHandlers(event), event, false);
     }
 
     /**
@@ -125,8 +127,8 @@
      * Update the event admin with new configuration.
      */
     public void update(final int timeout,
-            final String[] ignoreTimeout,
-            final boolean requireTopic)
+                    final String[] ignoreTimeout,
+                    final boolean requireTopic)
     {
         this.tracker.close();
         this.tracker.update(ignoreTimeout, requireTopic);
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
index 40a4a69..ac0db05 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
@@ -18,10 +18,17 @@
  */
 package org.apache.felix.eventadmin.impl.handler;
 
+import java.util.Collection;
+
 import org.apache.felix.eventadmin.impl.security.PermissionsUtil;
 import org.apache.felix.eventadmin.impl.util.LogWrapper;
-import org.osgi.framework.*;
-import org.osgi.service.event.*;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.Filter;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
 
 /**
  * This is a proxy for event handlers. It gets the real event handler
@@ -55,28 +62,31 @@
     /** Use timeout. */
     private boolean useTimeout;
 
-	/**
-	 * Create an EventHandlerProxy.
+    /** Deliver async ordered. */
+    private boolean asyncOrderedDelivery;
+
+    /**
+     * Create an EventHandlerProxy.
      *
      * @param context The handler context
-	 * @param reference Reference to the EventHandler
-	 */
-	public EventHandlerProxy(final EventHandlerTracker.HandlerContext context,
-	        final ServiceReference reference)
-	{
-	    this.handlerContext = context;
-		this.reference = reference;
-	}
+     * @param reference Reference to the EventHandler
+     */
+    public EventHandlerProxy(final EventHandlerTracker.HandlerContext context,
+                    final ServiceReference reference)
+    {
+        this.handlerContext = context;
+        this.reference = reference;
+    }
 
-	/**
-	 * Update the state with current properties from the service
-	 * @return <code>true</code> if the handler configuration is valid.
-	 */
-	public boolean update()
-	{
-	    this.blacklisted = false;
-		boolean valid = true;
-		// First check, topic
+    /**
+     * Update the state with current properties from the service
+     * @return <code>true</code> if the handler configuration is valid.
+     */
+    public boolean update()
+    {
+        this.blacklisted = false;
+        boolean valid = true;
+        // First check, topic
         final Object topicObj = reference.getProperty(EventConstants.EVENT_TOPIC);
         if (topicObj instanceof String)
         {
@@ -126,11 +136,11 @@
                 reason = "Neither of type String nor String[] : " + topicObj.getClass().getName();
             }
             LogWrapper.getLogger().log(
-                    this.reference,
-                    LogWrapper.LOG_WARNING,
-                    "Invalid EVENT_TOPICS : " + reason + " - Ignoring ServiceReference ["
-                        + this.reference + " | Bundle("
-                        + this.reference.getBundle() + ")]");
+                            this.reference,
+                            LogWrapper.LOG_WARNING,
+                            "Invalid EVENT_TOPICS : " + reason + " - Ignoring ServiceReference ["
+                                            + this.reference + " | Bundle("
+                                            + this.reference.getBundle() + ")]");
             this.topics = null;
             valid = false;
         }
@@ -149,39 +159,90 @@
                 {
                     valid = false;
                     LogWrapper.getLogger().log(
-                            this.reference,
-                            LogWrapper.LOG_WARNING,
-                            "Invalid EVENT_FILTER - Ignoring ServiceReference ["
-                                + this.reference + " | Bundle("
-                                + this.reference.getBundle() + ")]", e);
+                                    this.reference,
+                                    LogWrapper.LOG_WARNING,
+                                    "Invalid EVENT_FILTER - Ignoring ServiceReference ["
+                                                    + this.reference + " | Bundle("
+                                                    + this.reference.getBundle() + ")]", e);
                 }
             }
             else if ( filterObj != null )
             {
                 valid = false;
                 LogWrapper.getLogger().log(
-                        this.reference,
-                        LogWrapper.LOG_WARNING,
-                        "Invalid EVENT_FILTER - Ignoring ServiceReference ["
-                            + this.reference + " | Bundle("
-                            + this.reference.getBundle() + ")]");
+                                this.reference,
+                                LogWrapper.LOG_WARNING,
+                                "Invalid EVENT_FILTER - Ignoring ServiceReference ["
+                                                + this.reference + " | Bundle("
+                                                + this.reference.getBundle() + ")]");
             }
         }
         this.filter = handlerFilter;
 
+        // new in 1.3 - deliver
+        this.asyncOrderedDelivery = true;
+        Object delivery = reference.getProperty(EventConstants.EVENT_DELIVERY);
+        if ( delivery instanceof Collection )
+        {
+            delivery = ((Collection)delivery).toArray(new String[((Collection)delivery).size()]);
+        }
+        if ( delivery instanceof String )
+        {
+            this.asyncOrderedDelivery =  !(EventConstants.DELIVERY_ASYNC_UNORDERED.equals(delivery.toString()));
+        }
+        else if ( delivery instanceof String[] )
+        {
+            final String[] deliveryArray = (String[])delivery;
+            boolean foundOrdered = false, foundUnordered = false;
+            for(int i=0; i<deliveryArray.length; i++)
+            {
+                final String value = deliveryArray[i];
+                if ( EventConstants.DELIVERY_ASYNC_UNORDERED.equals(value) )
+                {
+                    foundUnordered = true;
+                }
+                else if ( EventConstants.DELIVERY_ASYNC_ORDERED.equals(value) )
+                {
+                    foundOrdered = true;
+                }
+                else
+                {
+                    LogWrapper.getLogger().log(
+                                    this.reference,
+                                    LogWrapper.LOG_WARNING,
+                                    "Invalid EVENT_DELIVERY - Ignoring invalid value for event delivery property " + value + " of ServiceReference ["
+                                                    + this.reference + " | Bundle("
+                                                    + this.reference.getBundle() + ")]");
+
+                }
+            }
+            if ( !foundOrdered && foundUnordered )
+            {
+                this.asyncOrderedDelivery = false;
+            }
+        }
+        else if ( delivery != null )
+        {
+            LogWrapper.getLogger().log(
+                            this.reference,
+                            LogWrapper.LOG_WARNING,
+                            "Invalid EVENT_DELIVERY - Ignoring event delivery property " + delivery + " of ServiceReference ["
+                                            + this.reference + " | Bundle("
+                                            + this.reference.getBundle() + ")]");
+        }
         // make sure to release the handler
         this.release();
 
         return valid;
-	}
+    }
 
-	/**
-	 * Dispose the proxy and release the handler
-	 */
-	public void dispose()
-	{
-	    this.release();
-	}
+    /**
+     * Dispose the proxy and release the handler
+     */
+    public void dispose()
+    {
+        this.release();
+    }
 
     /**
      * Get the event handler.
@@ -206,12 +267,12 @@
     }
 
     /**
-	 * Release the handler
-	 */
-	private synchronized void release()
-	{
-		if ( this.handler != null )
-		{
+     * Release the handler
+     */
+    private synchronized void release()
+    {
+        if ( this.handler != null )
+        {
             try
             {
                 this.handlerContext.bundleContext.ungetService(this.reference);
@@ -221,17 +282,17 @@
                 // event handler might be stopped - ignore
             }
             this.handler = null;
-		}
-	}
+        }
+    }
 
-	/**
-	 * Get the topics of this handler.
-	 * If this handler matches all topics <code>null</code> is returned
-	 */
-	public String[] getTopics()
-	{
-	    return this.topics;
-	}
+    /**
+     * Get the topics of this handler.
+     * If this handler matches all topics <code>null</code> is returned
+     */
+    public String[] getTopics()
+    {
+        return this.topics;
+    }
 
     /**
      * Check if this handler is allowed to receive the event
@@ -278,6 +339,14 @@
     }
 
     /**
+     * Should async events be delivered in order?
+     */
+    public boolean isAsyncOrderedDelivery()
+    {
+        return this.asyncOrderedDelivery;
+    }
+
+    /**
      * Check the timeout configuration for this handler.
      */
     private void checkTimeout(final String className)
@@ -300,43 +369,43 @@
     }
 
     /**
-	 * Send the event.
-	 */
-	public void sendEvent(final Event event)
-	{
-		final EventHandler handlerService = this.obtain();
-		if (handlerService == null)
-		{
-			return;
-		}
+     * Send the event.
+     */
+    public void sendEvent(final Event event)
+    {
+        final EventHandler handlerService = this.obtain();
+        if (handlerService == null)
+        {
+            return;
+        }
 
-		try
-		{
-			handlerService.handleEvent(event);
-		}
-		catch (final Throwable e)
-		{
+        try
+        {
+            handlerService.handleEvent(event);
+        }
+        catch (final Throwable e)
+        {
             // The spec says that we must catch exceptions and log them:
             LogWrapper.getLogger().log(
-                this.reference,
-                LogWrapper.LOG_WARNING,
-                "Exception during event dispatch [" + event + " | "
-                    + this.reference + " | Bundle("
-                    + this.reference.getBundle() + ")]", e);
-		}
-	}
+                            this.reference,
+                            LogWrapper.LOG_WARNING,
+                            "Exception during event dispatch [" + event + " | "
+                                            + this.reference + " | Bundle("
+                                            + this.reference.getBundle() + ")]", e);
+        }
+    }
 
-	/**
-	 * Blacklist the handler.
-	 */
-	public void blackListHandler()
-	{
+    /**
+     * Blacklist the handler.
+     */
+    public void blackListHandler()
+    {
         LogWrapper.getLogger().log(
-                LogWrapper.LOG_WARNING,
-                "Blacklisting ServiceReference [" + this.reference + " | Bundle("
-                    + this.reference.getBundle() + ")] due to timeout!");
+                        LogWrapper.LOG_WARNING,
+                        "Blacklisting ServiceReference [" + this.reference + " | Bundle("
+                                        + this.reference.getBundle() + ")] due to timeout!");
         this.blacklisted = true;
         // we can free the handler now.
         this.release();
-	}
+    }
 }
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java
index e3653f8..ecd9f3d 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java
@@ -18,8 +18,14 @@
  */
 package org.apache.felix.eventadmin.impl.tasks;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
+import org.apache.felix.eventadmin.impl.handler.EventHandlerProxy;
 import org.osgi.service.event.Event;
 
 /**
@@ -63,24 +69,42 @@
      */
     public void execute(final Collection tasks, final Event event)
     {
-        final Thread currentThread = Thread.currentThread();
-        TaskExecuter executer = null;
-        synchronized (m_running_threads )
+        final Iterator i = tasks.iterator();
+        boolean hasOrdered = false;
+        while ( i.hasNext() )
         {
-            TaskExecuter runningExecutor = (TaskExecuter)m_running_threads.get(currentThread);
-            if ( runningExecutor != null )
+            final EventHandlerProxy task = (EventHandlerProxy)i.next();
+            if ( !task.isAsyncOrderedDelivery() )
             {
-                runningExecutor.add(tasks, event);
+                // do somethimg
             }
             else
             {
-                executer = new TaskExecuter( tasks, event, currentThread );
-                m_running_threads.put(currentThread, executer);
+                hasOrdered = true;
             }
+
         }
-        if ( executer != null )
+        if ( hasOrdered )
         {
-            m_pool.executeTask(executer);
+            final Thread currentThread = Thread.currentThread();
+            TaskExecuter executer = null;
+            synchronized (m_running_threads )
+            {
+                final TaskExecuter runningExecutor = (TaskExecuter)m_running_threads.get(currentThread);
+                if ( runningExecutor != null )
+                {
+                    runningExecutor.add(tasks, event);
+                }
+                else
+                {
+                    executer = new TaskExecuter( tasks, event, currentThread );
+                    m_running_threads.put(currentThread, executer);
+                }
+            }
+            if ( executer != null )
+            {
+                m_pool.executeTask(executer);
+            }
         }
     }
 
@@ -106,7 +130,7 @@
                 {
                     tasks = (Object[]) m_tasks.remove(0);
                 }
-                m_deliver_task.execute((Collection)tasks[0], (Event)tasks[1]);
+                m_deliver_task.execute((Collection)tasks[0], (Event)tasks[1], true);
                 synchronized ( m_running_threads )
                 {
                     running = m_tasks.size() > 0;
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
index a06f2c8..59c702c 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
@@ -101,7 +101,7 @@
      * @param tasks The event handler dispatch tasks to execute
      *
      */
-    public void execute(final Collection tasks, final Event event)
+    public void execute(final Collection tasks, final Event event, final boolean filterAsyncUnordered)
     {
         final Thread sleepingThread = Thread.currentThread();
         final SyncThread syncThread = sleepingThread instanceof SyncThread ? (SyncThread)sleepingThread : null;
@@ -110,62 +110,64 @@
         while ( i.hasNext() )
         {
             final EventHandlerProxy task = (EventHandlerProxy)i.next();
-
-            if ( !useTimeout(task) )
+            if ( !filterAsyncUnordered || task.isAsyncOrderedDelivery() )
             {
-                // no timeout, we can directly execute
-                task.sendEvent(event);
-            }
-            else if ( syncThread != null )
-            {
-                // if this is a cascaded event, we directly use this thread
-                // otherwise we could end up in a starvation
-                final long startTime = System.currentTimeMillis();
-                task.sendEvent(event);
-                if ( System.currentTimeMillis() - startTime > this.timeout )
+                if ( !useTimeout(task) )
                 {
-                    task.blackListHandler();
+                    // no timeout, we can directly execute
+                    task.sendEvent(event);
                 }
-            }
-            else
-            {
-                final Rendezvous startBarrier = new Rendezvous();
-                final Rendezvous timerBarrier = new Rendezvous();
-                this.pool.executeTask(new Runnable()
+                else if ( syncThread != null )
                 {
-                    public void run()
+                    // if this is a cascaded event, we directly use this thread
+                    // otherwise we could end up in a starvation
+                    final long startTime = System.currentTimeMillis();
+                    task.sendEvent(event);
+                    if ( System.currentTimeMillis() - startTime > this.timeout )
                     {
-                        try
-                        {
-                            // notify the outer thread to start the timer
-                            startBarrier.waitForRendezvous();
-                            // execute the task
-                            task.sendEvent(event);
-                            // stop the timer
-                            timerBarrier.waitForRendezvous();
-                        }
-                        catch (final IllegalStateException ise)
-                        {
-                            // this can happen on shutdown, so we ignore it
-                        }
+                        task.blackListHandler();
                     }
-                });
-                // we wait for the inner thread to start
-                startBarrier.waitForRendezvous();
-
-                // timeout handling
-                // we sleep for the sleep time
-                // if someone wakes us up it's the finished inner task
-                try
-                {
-                    timerBarrier.waitAttemptForRendezvous(this.timeout);
                 }
-                catch (final TimeoutException ie)
+                else
                 {
-                    // if we timed out, we have to blacklist the handler
-                    task.blackListHandler();
-                }
+                    final Rendezvous startBarrier = new Rendezvous();
+                    final Rendezvous timerBarrier = new Rendezvous();
+                    this.pool.executeTask(new Runnable()
+                    {
+                        public void run()
+                        {
+                            try
+                            {
+                                // notify the outer thread to start the timer
+                                startBarrier.waitForRendezvous();
+                                // execute the task
+                                task.sendEvent(event);
+                                // stop the timer
+                                timerBarrier.waitForRendezvous();
+                            }
+                            catch (final IllegalStateException ise)
+                            {
+                                // this can happen on shutdown, so we ignore it
+                            }
+                        }
+                    });
+                    // we wait for the inner thread to start
+                    startBarrier.waitForRendezvous();
 
+                    // timeout handling
+                    // we sleep for the sleep time
+                    // if someone wakes us up it's the finished inner task
+                    try
+                    {
+                        timerBarrier.waitAttemptForRendezvous(this.timeout);
+                    }
+                    catch (final TimeoutException ie)
+                    {
+                        // if we timed out, we have to blacklist the handler
+                        task.blackListHandler();
+                    }
+
+                }
             }
         }
     }