FELIX-4630 : Adding PerformanceTestIT to measure difference between send and post events. Apply patch from Freddy Guime
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1622446 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/eventadmin/impl/changelog.txt b/eventadmin/impl/changelog.txt
index 8550b50..2096752 100644
--- a/eventadmin/impl/changelog.txt
+++ b/eventadmin/impl/changelog.txt
@@ -3,6 +3,7 @@
** Improvement
* [FELIX-4623] - Make Async to Sync ThreadPool Ratio Configurable
* [FELIX-4629] - Documentation - Properties and Property Defaults Incorrect
+ * [FELIX-4630] - Adding PerformanceTestIT to measure difference between send and post events
** Bug
* [FELIX-4627] - Potential Memory Leak in AsyncDeliverTasks
* [FELIX-4617] - Empty configurations for ignore topic and ignore timeout lead to error messages in the log
diff --git a/eventadmin/impl/src/test/java/org/apache/felix/eventadmin/ittests/PerformanceTestIT.java b/eventadmin/impl/src/test/java/org/apache/felix/eventadmin/ittests/PerformanceTestIT.java
new file mode 100644
index 0000000..ecb13d9
--- /dev/null
+++ b/eventadmin/impl/src/test/java/org/apache/felix/eventadmin/ittests/PerformanceTestIT.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.felix.eventadmin.ittests;
+
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.options.AbstractDelegateProvisionOption;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.ops4j.pax.exam.Constants.START_LEVEL_SYSTEM_BUNDLES;
+import static org.ops4j.pax.exam.CoreOptions.*;
+
+@RunWith(PaxExam.class)
+public class PerformanceTestIT {
+ // the name of the system property providing the bundle file to be installed and tested
+ private static final String BUNDLE_JAR_SYS_PROP = "project.bundle.file";
+
+ /** The logger. */
+ protected static final Logger logger = LoggerFactory.getLogger(PerformanceTestIT.class);
+ private static final int RUNS = 5;
+ public static final int BATCH_SIZE = 500000;
+
+ @Inject
+ protected BundleContext bundleContext;
+
+ /** Event admin reference. */
+ private ServiceReference eventAdminReference;
+
+ /** Event admin. */
+ private EventAdmin eventAdmin;
+
+ final AtomicLong counter = new AtomicLong();
+
+ Collection<Listener> listeners = new ArrayList<Listener>();
+
+ @Configuration
+ public static Option[] configuration() {
+ final String bundleFileName = System.getProperty( BUNDLE_JAR_SYS_PROP );
+ logger.info("Bundle jar at :"+bundleFileName);
+ final File bundleFile = new File( bundleFileName );
+ if ( !bundleFile.canRead() ) {
+ throw new IllegalArgumentException( "Cannot read from bundle file " + bundleFileName + " specified in the "
+ + BUNDLE_JAR_SYS_PROP + " system property" );
+ }
+ return options(
+ vmOption("-Xms1024m"),
+// vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"),
+ provision(
+ mavenBundle( "org.ops4j.pax.tinybundles", "tinybundles", "1.0.0" ),
+ mavenBundle("org.apache.sling", "org.apache.sling.commons.log", "2.1.2"),
+ mavenBundle("org.apache.felix", "org.apache.felix.configadmin", "1.2.8"),
+ mavenBundle("org.apache.felix", "org.apache.felix.metatype", "1.0.4"),
+ CoreOptions.bundle(bundleFile.toURI().toString()),
+ mavenBundle("org.ops4j.pax.url", "pax-url-mvn", "1.3.5")
+ ),
+ // below is instead of normal Pax Exam junitBundles() to deal
+ // with build server issue
+ new DirectURLJUnitBundlesOption(),
+ systemProperty("pax.exam.invoker").value("junit"),
+ bundle("link:classpath:META-INF/links/org.ops4j.pax.exam.invoker.junit.link")
+ );
+ }
+
+ protected EventAdmin loadEventAdmin() {
+ if ( eventAdminReference == null || eventAdminReference.getBundle() == null ) {
+ eventAdmin = null;
+ eventAdminReference = bundleContext.getServiceReference(EventAdmin.class.getName());
+ }
+ if ( eventAdmin == null && eventAdminReference != null ) {
+ eventAdmin = (EventAdmin) bundleContext.getService(eventAdminReference);
+ }
+ return eventAdmin;
+ }
+
+ public void addListener(Listener listener, String... topics) {
+ listener.register(bundleContext,topics);
+ listeners.add(listener);
+ }
+
+ private void removeListener(Listener listener) {
+ listener.unregister();
+ }
+
+
+ protected void send(String topic, Dictionary<String, Object> properties, boolean sync) {
+ final Event event = new Event(topic, properties);
+ if ( sync ) {
+ eventAdmin.sendEvent(event);
+ } else {
+ eventAdmin.postEvent(event);
+ }
+ }
+
+
+ @Test
+ public void measureThroughputSend() {
+ loadEventAdmin();
+ addListener(new Listener() {
+ @Override
+ public void handleEvent(Event event) {
+ long calledTimes = counter.incrementAndGet();
+ if (calledTimes == BATCH_SIZE ) {
+ synchronized (counter) {
+ counter.notify();
+ }
+ }
+
+ }
+ }, "topic");
+
+ // Warm-up
+ Hashtable<String, Object> properties = new Hashtable<String, Object>();
+ for (int i= 0;i < BATCH_SIZE;i++) {
+ properties.put("key",i);
+ send("topic", properties, false);
+ }
+ int average =0;
+ for (int runs = 0; runs < RUNS;runs ++) {
+
+ final CountDownLatch latch = new CountDownLatch(BATCH_SIZE);
+ addListener(new Listener() {
+ @Override
+ public void handleEvent(Event event) {
+ latch.countDown();
+ }
+ }, "topic" + runs);
+
+ ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(BATCH_SIZE+1);
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
+ 1,
+ 1000,
+ TimeUnit.MILLISECONDS, workQueue);
+
+
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ final String topicString = "topic"+runs;
+ final Hashtable<String,Object> localProperties = new Hashtable<String, Object>();
+ localProperties.put(topicString,i);
+ workQueue.add(new Runnable() {
+ @Override
+ public void run() {
+ send(topicString, localProperties, true);
+ }
+ });
+ }
+
+ long startTime = System.nanoTime();
+ executor.prestartAllCoreThreads();
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ long endTime = System.nanoTime();
+ long milliseconds = (endTime - startTime) / 1000000;
+ logger.info("Post Run "+runs+" Elapsed :" + milliseconds);
+ average += milliseconds;
+ }
+
+ logger.info("Send Avg: "+average / RUNS);
+ }
+
+ @Test
+ public void measureThroughputPost() {
+ loadEventAdmin();
+ addListener(new Listener() {
+ @Override
+ public void handleEvent(Event event) {
+ long calledTimes = counter.incrementAndGet();
+ if (calledTimes == BATCH_SIZE ) {
+ synchronized (counter) {
+ counter.notify();
+ }
+ }
+
+ }
+ }, "topic");
+
+ // Warm-up
+ Hashtable<String, Object> properties = new Hashtable<String, Object>();
+ for (int i= 0;i < BATCH_SIZE;i++) {
+ properties.put("key",i);
+ send("topic", properties, false);
+ }
+ int average =0;
+ for (int runs = 0; runs < RUNS;runs ++) {
+
+ final CountDownLatch latch = new CountDownLatch(BATCH_SIZE);
+ addListener(new Listener() {
+ @Override
+ public void handleEvent(Event event) {
+ latch.countDown();
+ }
+ }, "topic" + runs);
+
+ ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(BATCH_SIZE+1);
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+ Runtime.getRuntime().availableProcessors(),
+ 1000,
+ TimeUnit.MILLISECONDS, workQueue);
+
+
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ final String topicString = "topic"+runs;
+ final Hashtable<String,Object> localProperties = new Hashtable<String, Object>();
+ localProperties.put(topicString,i);
+ workQueue.add(new Runnable() {
+ @Override
+ public void run() {
+ send(topicString, localProperties, false);
+ }
+ });
+ }
+
+ long startTime = System.nanoTime();
+ executor.prestartAllCoreThreads();
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ long endTime = System.nanoTime();
+ long milliseconds = (endTime - startTime) / 1000000;
+ logger.info("Post Run "+runs+" Elapsed :" + milliseconds);
+ average += milliseconds;
+ }
+
+ logger.info("Post Avg: "+average / RUNS);
+ }
+
+ @After
+ public void tearDown() {
+ for (Listener listener : listeners) {
+ removeListener(listener);
+ }
+ }
+
+
+
+ private static abstract class Listener implements EventHandler {
+ private ServiceRegistration registration;
+
+ protected Listener() {
+ }
+
+ public void register(BundleContext bundleContext, String...topics) {
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ if ( topics != null ) {
+ props.put("event.topics", topics);
+ } else {
+ props.put("event.topics", "*");
+ }
+ this.registration = bundleContext.registerService(EventHandler.class.getName(), this, props);
+ }
+
+ public void unregister() {
+ registration.unregister();
+ }
+ }
+
+
+ private static class DirectURLJUnitBundlesOption
+ extends AbstractDelegateProvisionOption<DirectURLJUnitBundlesOption> {
+
+ /**
+ * Constructor.
+ */
+ public DirectURLJUnitBundlesOption(){
+ super(
+ bundle("http://repository.springsource.com/ivy/bundles/external/org.junit/com.springsource.org.junit/4.9.0/com.springsource.org.junit-4.9.0.jar")
+ );
+ noUpdate();
+ startLevel(START_LEVEL_SYSTEM_BUNDLES);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return String.format("DirectURLJUnitBundlesOption{url=%s}", getURL());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected DirectURLJUnitBundlesOption itself() {
+ return this;
+ }
+
+ }
+}