FELIX-4262 QueueServices should be observable
* Added QueueListener to observe from a third-party what's going on in the thread pool
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1528120 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueService.java b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueService.java
new file mode 100644
index 0000000..c5e6b58
--- /dev/null
+++ b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.felix.ipojo.extender.internal.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.felix.ipojo.extender.internal.AbstractService;
+import org.apache.felix.ipojo.extender.queue.Callback;
+import org.apache.felix.ipojo.extender.queue.JobInfo;
+import org.apache.felix.ipojo.extender.queue.QueueListener;
+import org.apache.felix.ipojo.extender.queue.QueueService;
+import org.osgi.framework.BundleContext;
+
+/**
+ * User: guillaume
+ * Date: 01/10/13
+ * Time: 14:41
+ */
+public abstract class AbstractQueueService extends AbstractService implements QueueService, QueueNotifier {
+
+ /**
+ * Store QueueListeners.
+ */
+ protected final List<QueueListener> m_listeners = new ArrayList<QueueListener>();
+
+ /**
+ * Constructor.
+ *
+ * @param bundleContext the bundle context
+ * @param type the specification
+ */
+ protected AbstractQueueService(final BundleContext bundleContext, final Class<?> type) {
+ super(bundleContext, type);
+ }
+
+ public void addQueueListener(final QueueListener listener) {
+ m_listeners.add(listener);
+ }
+
+ public void removeQueueListener(final QueueListener listener) {
+ m_listeners.remove(listener);
+ }
+
+ public void fireEnlistedJobInfo(JobInfo info) {
+ for (QueueListener listener : m_listeners) {
+ listener.enlisted(info);
+ }
+ }
+
+ public void fireStartedJobInfo(JobInfo info) {
+ for (QueueListener listener : m_listeners) {
+ listener.started(info);
+ }
+ }
+
+ public void fireExecutedJobInfo(JobInfo info, Object result) {
+ for (QueueListener listener : m_listeners) {
+ listener.executed(info, result);
+ }
+ }
+
+ public void fireFailedJobInfo(JobInfo info, Throwable throwable) {
+ for (QueueListener listener : m_listeners) {
+ listener.failed(info, throwable);
+ }
+ }
+
+}
diff --git a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/ExecutorQueueService.java b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/ExecutorQueueService.java
index a75c089..143174a 100644
--- a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/ExecutorQueueService.java
+++ b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/ExecutorQueueService.java
@@ -19,7 +19,6 @@
package org.apache.felix.ipojo.extender.internal.queue;
-import org.apache.felix.ipojo.extender.internal.AbstractService;
import org.apache.felix.ipojo.extender.internal.LifecycleQueueService;
import org.apache.felix.ipojo.extender.queue.Callback;
import org.apache.felix.ipojo.extender.queue.JobInfo;
@@ -36,7 +35,7 @@
/**
* An asynchronous implementation of the queue service. This implementation relies on an executor service.
*/
-public class ExecutorQueueService extends AbstractService implements LifecycleQueueService, ManagedService {
+public class ExecutorQueueService extends AbstractQueueService implements LifecycleQueueService, ManagedService {
/**
* Property name used to configure this ThreadPool's size (usable as System Property or ConfigAdmin property).
@@ -183,7 +182,8 @@
* @return the reference on the submitted job
*/
public <T> Future<T> submit(Callable<T> callable, Callback<T> callback, String description) {
- return m_executorService.submit(new JobInfoCallable<T>(m_statistic, callable, callback, description));
+ JobInfoCallable<T> task = new JobInfoCallable<T>(this, m_statistic, callable, callback, description);
+ return m_executorService.submit(task);
}
public <T> Future<T> submit(Callable<T> callable, String description) {
diff --git a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallable.java b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallable.java
index 2b98ca9..3693e16 100644
--- a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallable.java
+++ b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallable.java
@@ -31,6 +31,11 @@
public class JobInfoCallable<T> implements Callable<T>, JobInfo {
/**
+ * Notifier helper for {@link org.apache.felix.ipojo.extender.queue.QueueListener}.
+ */
+ private final QueueNotifier m_queueNotifier;
+
+ /**
* The statistic object.
*/
private final Statistic m_statistic;
@@ -68,20 +73,26 @@
/**
* Creates the job info callable.
*
+ * @param queueNotifier notifier for QueueListeners
* @param statistic the statistics that will be populated
* @param delegate the real job
* @param callback the callback notified when the job is completed
* @param description the job description
*/
- public JobInfoCallable(Statistic statistic,
+ public JobInfoCallable(QueueNotifier queueNotifier,
+ Statistic statistic,
Callable<T> delegate,
Callback<T> callback,
String description) {
+ m_queueNotifier = queueNotifier;
m_statistic = statistic;
m_delegate = delegate;
m_callback = callback;
m_description = description;
m_statistic.getWaiters().add(this);
+
+ // Assume that we will be enlisted in the next few cycles
+ m_queueNotifier.fireEnlistedJobInfo(this);
}
/**
@@ -96,20 +107,29 @@
startTime = System.currentTimeMillis();
m_statistic.getCurrentsCounter().incrementAndGet();
T result = null;
+ Exception exception = null;
try {
+ m_queueNotifier.fireStartedJobInfo(this);
result = m_delegate.call();
return result;
} catch (Exception e) {
+ m_queueNotifier.fireFailedJobInfo(this, e);
if (m_callback != null) {
m_callback.error(this, e);
}
+ exception = e;
throw e;
} finally {
m_statistic.getCurrentsCounter().decrementAndGet();
m_statistic.getFinishedCounter().incrementAndGet();
endTime = System.currentTimeMillis();
- if (m_callback != null) {
- m_callback.success(this, result);
+
+ // Only exec success callbacks when no error occurred
+ if (exception == null) {
+ m_queueNotifier.fireExecutedJobInfo(this, result);
+ if (m_callback != null) {
+ m_callback.success(this, result);
+ }
}
}
}
diff --git a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/QueueNotifier.java b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/QueueNotifier.java
new file mode 100644
index 0000000..c7fdcfc
--- /dev/null
+++ b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/QueueNotifier.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.felix.ipojo.extender.internal.queue;
+
+import org.apache.felix.ipojo.extender.queue.JobInfo;
+
+/**
+ * Internal interface to de-couple event producer and event listeners.
+ */
+public interface QueueNotifier {
+ void fireEnlistedJobInfo(JobInfo info);
+ void fireStartedJobInfo(JobInfo info);
+ void fireExecutedJobInfo(JobInfo info, Object result);
+ void fireFailedJobInfo(JobInfo info, Throwable throwable);
+}
diff --git a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/SynchronousQueueService.java b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/SynchronousQueueService.java
index 6800b29..d2533fb 100644
--- a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/SynchronousQueueService.java
+++ b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/SynchronousQueueService.java
@@ -26,6 +26,7 @@
import org.apache.felix.ipojo.extender.queue.QueueService;
import org.osgi.framework.BundleContext;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Dictionary;
import java.util.Hashtable;
@@ -35,7 +36,7 @@
/**
* An implementation of the Lifecycle Queue Service for synchronous processing.
*/
-public class SynchronousQueueService extends AbstractService implements LifecycleQueueService {
+public class SynchronousQueueService extends AbstractQueueService implements LifecycleQueueService {
private final Statistic m_statistic = new Statistic();
@@ -67,7 +68,7 @@
}
public <T> Future<T> submit(Callable<T> callable, Callback<T> callback, String description) {
- JobInfoCallable<T> exec = new JobInfoCallable<T>(m_statistic, callable, callback, description);
+ JobInfoCallable<T> exec = new JobInfoCallable<T>(this, m_statistic, callable, callback, description);
try {
return new ImmediateFuture<T>(exec.call());
} catch (Exception e) {
diff --git a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/PreferenceQueueService.java b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/PreferenceQueueService.java
index b1317cc..56b5c1c 100644
--- a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/PreferenceQueueService.java
+++ b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/PreferenceQueueService.java
@@ -22,6 +22,7 @@
import org.apache.felix.ipojo.extender.internal.LifecycleQueueService;
import org.apache.felix.ipojo.extender.queue.Callback;
import org.apache.felix.ipojo.extender.queue.JobInfo;
+import org.apache.felix.ipojo.extender.queue.QueueListener;
import org.apache.felix.ipojo.extender.queue.QueueService;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleReference;
@@ -158,4 +159,12 @@
public <T> Future<T> submit(Callable<T> callable) {
return submit(callable, "No description");
}
+
+ public void addQueueListener(final QueueListener listener) {
+ // Intentionally blank, not intended to have listeners
+ }
+
+ public void removeQueueListener(final QueueListener listener) {
+ // Intentionally blank, not intended to have listeners
+ }
}
\ No newline at end of file
diff --git a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/enforce/ForwardingQueueService.java b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/enforce/ForwardingQueueService.java
index 33139ae..c4cfafa 100644
--- a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/enforce/ForwardingQueueService.java
+++ b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/enforce/ForwardingQueueService.java
@@ -22,6 +22,7 @@
import org.apache.felix.ipojo.extender.internal.LifecycleQueueService;
import org.apache.felix.ipojo.extender.queue.Callback;
import org.apache.felix.ipojo.extender.queue.JobInfo;
+import org.apache.felix.ipojo.extender.queue.QueueListener;
import java.util.List;
import java.util.concurrent.Callable;
@@ -69,4 +70,12 @@
public <T> Future<T> submit(Callable<T> callable) {
return delegate().submit(callable);
}
+
+ public void addQueueListener(final QueueListener listener) {
+ delegate().addQueueListener(listener);
+ }
+
+ public void removeQueueListener(final QueueListener listener) {
+ delegate().removeQueueListener(listener);
+ }
}
diff --git a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueListener.java b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueListener.java
new file mode 100644
index 0000000..5518aa5
--- /dev/null
+++ b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueListener.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.felix.ipojo.extender.queue;
+
+/**
+ * A {@link org.apache.felix.ipojo.extender.queue.QueueListener} provides queue management information to external entities:
+ * <ul>
+ * <li>Job submission</li>
+ * <li>Job execution</li>
+ * <li>Job result (success or failure)</li>
+ * </ul>
+ *
+ * Implementer of this interface should not block as the invocation is done synchronously.
+ * Implementers are responsible to register themselves in the {@link org.apache.felix.ipojo.extender.queue.QueueService} they'll observe.
+ */
+public interface QueueListener {
+
+ /**
+ * Invoked when a job is just being enlisted (before processing).
+ * Only {@link JobInfo#getEnlistmentTime()} and {@link JobInfo#getWaitDuration()} provides meaningful values.
+ * Note that {@code waitDuration} value is re-evaluated at each call.
+ * @param info The job being enlisted
+ */
+ void enlisted(JobInfo info);
+
+ /**
+ * Invoked when a job's execution is just about to be started.
+ * Only {@link JobInfo#getEnlistmentTime()}, {@link JobInfo#getWaitDuration()}, {@link JobInfo#getStartTime()}
+ * and {@link JobInfo#getExecutionDuration()} provides meaningful values.
+ * Note that {@code executionDuration} value is re-evaluated at each call.
+ * @param info The job being started
+ */
+ void started(JobInfo info);
+
+ /**
+ * Invoked when a job's execution is finished successfully.
+ * Note the implementers should not retain any references to the provided {@code result} (memory leak).
+ * @param info The executed job
+ * @param result The job's result
+ */
+ void executed(JobInfo info, Object result);
+
+ /**
+ * Invoked when a job's execution is finished with error.
+ * Note the implementers should not retain any references to the provided {@code throwable} (memory leak).
+ * @param info The failed job
+ * @param throwable The job's thrown exception
+ */
+ void failed(JobInfo info, Throwable throwable);
+}
diff --git a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueService.java b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueService.java
index e66ba47..be52097 100644
--- a/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueService.java
+++ b/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueService.java
@@ -111,8 +111,15 @@
*/
<T> Future<T> submit(Callable<T> callable);
+ /**
+ * Add a {@link QueueListener} that will be notified on events relative to this {@link QueueService}.
+ * @param listener added listener
+ */
+ void addQueueListener(QueueListener listener);
- // TODO Add a way to add global callbacks
- //<T> void addGlobalCallback(Callback<T> callback);
- // <T> void removeGlobalCallback(Callback<T> callback);
+ /**
+ * Remove a {@link QueueListener} from this {@link QueueService}.
+ * @param listener removed listener
+ */
+ void removeQueueListener(QueueListener listener);
}
diff --git a/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueServiceTestCase.java b/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueServiceTestCase.java
new file mode 100644
index 0000000..94c6913
--- /dev/null
+++ b/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueServiceTestCase.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.felix.ipojo.extender.internal.queue;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.apache.felix.ipojo.extender.queue.Callback;
+import org.apache.felix.ipojo.extender.queue.JobInfo;
+import org.apache.felix.ipojo.extender.queue.QueueListener;
+import org.apache.felix.ipojo.extender.queue.QueueService;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
+import static org.mockito.Mockito.verify;
+
+import junit.framework.TestCase;
+
+/**
+ * User: guillaume
+ * Date: 01/10/13
+ * Time: 16:36
+ */
+public class AbstractQueueServiceTestCase extends TestCase {
+
+ @Mock
+ private BundleContext m_bundleContext;
+
+ @Mock
+ private JobInfo m_info;
+
+ @Mock
+ private QueueListener m_one;
+
+ @Mock
+ private QueueListener m_two;
+
+ private AbstractQueueService m_queueService;
+
+ @Override
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ m_queueService = new TestableAbstractQueueService(m_bundleContext, QueueService.class);
+ }
+
+ public void testFireEnlistedJobInfo() throws Exception {
+ m_queueService.addQueueListener(m_one);
+ m_queueService.addQueueListener(m_two);
+ m_queueService.fireEnlistedJobInfo(m_info);
+ verify(m_one).enlisted(m_info);
+ verify(m_two).enlisted(m_info);
+ }
+
+ public void testFireStartedJobInfo() throws Exception {
+ m_queueService.addQueueListener(m_one);
+ m_queueService.addQueueListener(m_two);
+ m_queueService.fireStartedJobInfo(m_info);
+ verify(m_one).started(m_info);
+ verify(m_two).started(m_info);
+ }
+
+ public void testFireExecutedJobInfo() throws Exception {
+ m_queueService.addQueueListener(m_one);
+ m_queueService.addQueueListener(m_two);
+ m_queueService.fireExecutedJobInfo(m_info, "hello");
+ verify(m_one).executed(m_info, "hello");
+ verify(m_two).executed(m_info, "hello");
+ }
+
+ public void testFireFailedJobInfo() throws Exception {
+ m_queueService.addQueueListener(m_one);
+ m_queueService.addQueueListener(m_two);
+ Exception throwable = new Exception();
+ m_queueService.fireFailedJobInfo(m_info, throwable);
+ verify(m_one).failed(m_info, throwable);
+ verify(m_two).failed(m_info, throwable);
+ }
+
+ private class TestableAbstractQueueService extends AbstractQueueService {
+
+ public TestableAbstractQueueService(final BundleContext bundleContext, final Class<?> type) {
+ super(bundleContext, type);
+ }
+
+ public int getFinished() {
+ return 0;
+ }
+
+ public int getWaiters() {
+ return 0;
+ }
+
+ public int getCurrents() {
+ return 0;
+ }
+
+ public List<JobInfo> getWaitersInfo() {
+ return null;
+ }
+
+ public <T> Future<T> submit(final Callable<T> callable, final Callback<T> callback, final String description) {
+ return null;
+ }
+
+ public <T> Future<T> submit(final Callable<T> callable, final String description) {
+ return null;
+ }
+
+ public <T> Future<T> submit(final Callable<T> callable) {
+ return null;
+ }
+ }
+
+}
diff --git a/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallableTestCase.java b/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallableTestCase.java
index 1b1541a..2c8b2e4 100644
--- a/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallableTestCase.java
+++ b/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallableTestCase.java
@@ -19,7 +19,15 @@
package org.apache.felix.ipojo.extender.internal.queue;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.apache.felix.ipojo.extender.internal.queue.callable.ExceptionCallable;
import org.apache.felix.ipojo.extender.internal.queue.callable.StringCallable;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
import junit.framework.TestCase;
@@ -27,10 +35,19 @@
* Checks the job info callable.
*/
public class JobInfoCallableTestCase extends TestCase {
+
+ @Mock
+ private QueueNotifier m_notifier;
+
+ @Override
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ }
+
public void testCall() throws Exception {
Statistic stat = new Statistic();
long mark = System.currentTimeMillis();
- JobInfoCallable<String> info = new JobInfoCallable<String>(stat, new StringCallable(), null, null);
+ JobInfoCallable<String> info = new JobInfoCallable<String>(m_notifier, stat, new StringCallable(), null, null);
// Before execution
assertTrue(info.getEnlistmentTime() >= mark);
@@ -49,5 +66,31 @@
assertEquals(0, stat.getCurrentsCounter().get());
assertEquals(1, stat.getFinishedCounter().get());
+ InOrder order = Mockito.inOrder(m_notifier);
+ order.verify(m_notifier).fireEnlistedJobInfo(info);
+ order.verify(m_notifier).fireStartedJobInfo(info);
+ order.verify(m_notifier).fireExecutedJobInfo(info, "hello");
+ verifyNoMoreInteractions(m_notifier);
+
+ }
+
+ public void testFailedCall() throws Exception {
+ Statistic stat = new Statistic();
+ Exception e = new Exception();
+ JobInfoCallable<String> info = new JobInfoCallable<String>(m_notifier, stat, new ExceptionCallable(e), null, null);
+
+ try {
+ info.call();
+ } catch (Exception e1) {
+ InOrder order = Mockito.inOrder(m_notifier);
+ order.verify(m_notifier).fireEnlistedJobInfo(info);
+ order.verify(m_notifier).fireStartedJobInfo(info);
+ order.verify(m_notifier).fireFailedJobInfo(info, e);
+ verifyNoMoreInteractions(m_notifier);
+ return;
+ }
+
+ fail("Should have throw an Exception");
+
}
}
diff --git a/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/callable/ExceptionCallable.java b/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/callable/ExceptionCallable.java
new file mode 100644
index 0000000..9ef2d00
--- /dev/null
+++ b/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/callable/ExceptionCallable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.felix.ipojo.extender.internal.queue.callable;
+
+import java.util.concurrent.Callable;
+
+/**
+* A dummy job.
+*/
+public class ExceptionCallable implements Callable<String> {
+
+ private final Exception m_exception;
+
+ public ExceptionCallable(Exception e) {
+ m_exception = e;
+ }
+
+ public String call() throws Exception {
+ throw m_exception;
+ }
+}