blob: fd6e77937560f4217d7628d5c5141bcf14694052 [file] [log] [blame]
/*
* Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.felix.eventadmin.impl.dispatch;
import java.util.ArrayList;
import java.util.List;
import org.apache.felix.eventadmin.impl.tasks.DeliverTask;
import org.apache.felix.eventadmin.impl.tasks.DispatchTask;
/**
* An implementation of a thread pool that uses a fixed number of cached threads
* but will spin-off new threads as needed. The underlying assumption is that
* threads that have been created more recently will be available sooner then older
* threads hence, once the pool size is reached older threads will be decoupled from
* the pool and the newly created are added to it.
*
* @author <a href="mailto:felix-dev@incubator.apache.org">Felix Project Team</a>
*/
// TODO: The least recently used method deployed is rather a hack in this case
// it really should be refactored into a plugable strategy. However, I believe
// it to be the best strategy in this case.
public class CacheThreadPool implements ThreadPool
{
// The internal lock for this object used instead synchronized(this)
// Note that it is used by the pooled threads created by this pool too. This is
// the reason why it is not private. Don't use it from the outside.
final Object m_lock = new Object();
// The pooled threads
private final PooledThread[] m_pool;
// The least recently used index
private final List m_index;
// Is this pool closed i.e., do we not pool thread anymore?
private boolean m_closed = false;
/**
* The constructor of the pool. The given size will be used as the max number of
* pooled threads.
*
* @param size The max number of threads pooled at a given time.
*/
public CacheThreadPool(final int size)
{
synchronized (m_lock)
{
m_pool = new PooledThread[size];
// We assume that a list is expanded once it reaches half of its capacity
// and it doesn't harm if the assumption is wrong.
m_index = new ArrayList(size + 1 + (size / 2));
}
}
/**
* Executes the task in a thread out of the pool or a new thread if no pooled
* thread is available. In case that the max size is reached the least recently
* used (i.e., the longest executing) thread in the pool is decoupled and a new
* one added to the pool that is used to execute the task.
*
* @param task The task to execute
* @param callback The callback associated with the task
*
* @see org.apache.felix.eventadmin.impl.dispatch.ThreadPool#execute(DispatchTask, DeliverTask)
*/
public void execute(final DispatchTask task, final DeliverTask callback)
{
// Note that we associate a callback with a task via the thread used to
// execute the task. In general, a free slot in the pool (i.e., m_pool[i] is
// null) can be used to set-up a new thread. Also note that we need to
// update the LRU index if we change the pool.
synchronized(m_lock)
{
if(m_closed)
{
// We are closed hence, spin-of a new thread for the new task.
final PooledThread result = new PooledThread();
// Set-up the thread and associate the task with the callback.
result.reset(task, callback);
// release the thread immediately since we don't pool anymore.
result.release();
return;
}
// Search in the pool for a free thread.
for (int i = 0; i < m_pool.length; i++)
{
// o.k. we found a free slot now set-up a new thread for it.
if (null == m_pool[i])
{
m_pool[i] = new PooledThread();
m_pool[i].reset(task, callback);
m_index.add(new Integer(i));
return;
}
else if (m_pool[i].available())
{
// we found a free thread now set it up.
m_pool[i].reset(task, callback);
final Integer idx = new Integer(i);
m_index.remove(idx);
m_index.add(idx);
return;
}
}
// The pool is full and no threads are available hence, spin-off a new
// thread and add it to the pool while decoupling the least recently used
// one. This assumes that older threads are likely to take longer to
// become available again then younger ones.
final int pos = ((Integer) m_index.remove(0)).intValue();
m_index.add(new Integer(pos));
m_pool[pos].release();
m_pool[pos] = new PooledThread();
m_pool[pos].reset(task, callback);
}
}
/**
* Look-up the callback associated with the task that the given thread is
* currently executing or return the default value that may be <tt>null</tt>.
*
* @param thread The thread that is currently executing the task for which to
* return the callback. In case the thread is not created by an instance of
* this class the default value will be returned.
* @param defaultCallback The value to return in case that the thread was not
* created by an instance of this class. May be <tt>null</tt>
* @return The callback associated with the given thread or the default value.
*
* @see org.apache.felix.eventadmin.impl.dispatch.ThreadPool#getCallback(Thread, DeliverTask)
*/
public DeliverTask getCallback(final Thread thread, final DeliverTask defaultCallback)
{
synchronized (m_lock)
{
if (thread instanceof PooledThread)
{
return ((PooledThread) thread).getCallback();
}
return defaultCallback;
}
}
/**
* Look-up the task that the given thread is currently executing or return the
* default value that may be <tt>null</tt> in case that the thread has not been
* created by an instance of this class.
*
* @param thread The thread whose currently executed task should be returned.
* @param defaultTask The default value to be returned in case that the thread
* was not created by this instance or doesn't currently has a task. May be
* <tt>null</tt>
* @return The task the given thread is currently executing or the defaultTask
*
* @see org.apache.felix.eventadmin.impl.dispatch.ThreadPool#getTask(Thread, DispatchTask)
*/
public DispatchTask getTask(Thread thread, DispatchTask defaultTask)
{
synchronized (m_lock)
{
if (thread instanceof PooledThread)
{
return ((PooledThread) thread).getTask();
}
return defaultTask;
}
}
/**
* Close the pool i.e, stop pooling threads. Note that subsequently, task will
* still be executed but no pooling is taking place anymore.
*
* @see org.apache.felix.eventadmin.impl.dispatch.ThreadPool#close()
*/
public void close()
{
synchronized (m_lock)
{
// We are closed hence, decouple all threads from the pool
for (int i = 0; i < m_pool.length; i++)
{
if (null != m_pool[i])
{
m_pool[i].release();
m_pool[i] = null;
}
}
m_closed = true;
}
}
/*
* The threads created by this pool. A PooledThread blocks until it gets a new
* task from the pool or is released. Additionally, it is used to associate
* the task it currently runs with its callback.
*/
private class PooledThread extends Thread
{
// The current task or null if none
private DispatchTask m_runnable = null;
// The callback associated with the current task
private DeliverTask m_callback = null;
// Is this thread decoupled from the pool (i.e, may cease to exists once its
// current task is finished)?
private boolean m_released = false;
/*
* This will set-up the thread as a daemon and start it too. No need to call
* its start method explicitly
*/
PooledThread()
{
setDaemon(true);
start();
}
/**
* Call next() in a loop until next() returns null indicating that we are
* done (i.e., decoupled from the pool) and may cease to exist.
*/
public void run()
{
for (Runnable next = next(); null != next; next = next())
{
next.run();
synchronized (m_lock)
{
m_runnable = null;
}
}
}
/*
* Block until a new task is available or we are decoupled from the pool.
* This will return the next task or null if we are decoupled from the pool
*/
private DispatchTask next()
{
synchronized (m_lock)
{
while (null == m_runnable)
{
if (m_released)
{
return null;
}
try
{
m_lock.wait();
} catch (InterruptedException e)
{
// Not needed
}
}
return m_runnable;
}
}
/*
* Set-up the thread for the next task
*/
void reset(final DispatchTask next, final DeliverTask callback)
{
synchronized (m_lock)
{
m_runnable = next;
m_callback = callback;
m_lock.notifyAll();
}
}
/*
* Return the callback associated with the current task
*/
DeliverTask getCallback()
{
synchronized (m_lock)
{
return m_callback;
}
}
/*
* Return whether this thread is available (i.e., has no task and has not
* been released) or not.
*/
boolean available()
{
synchronized (m_lock)
{
return (null == m_runnable) && (!m_released);
}
}
/*
* Return the current task or null if none
*/
DispatchTask getTask()
{
synchronized (m_lock)
{
return m_runnable;
}
}
/*
* Decouple this thread from the pool
*/
void release()
{
synchronized (m_lock)
{
m_released = true;
m_lock.notifyAll();
}
}
}
}