[FELIX-3997] Provide an abstract bundle extender
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1461027 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/utils/pom.xml b/utils/pom.xml
index 2c93ef7..4c0e2c8 100644
--- a/utils/pom.xml
+++ b/utils/pom.xml
@@ -26,7 +26,7 @@
<modelVersion>4.0.0</modelVersion>
<name>Apache Felix Utils</name>
<description>Utility classes for OSGi.</description>
- <version>1.2.1-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
<artifactId>org.apache.felix.utils</artifactId>
<scm>
<connection>scm:svn:http://svn.apache.org/repos/asf/felix/trunk/utils</connection>
@@ -42,7 +42,7 @@
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
- <version>4.1.0</version>
+ <version>4.2.0</version>
</dependency>
</dependencies>
<build>
diff --git a/utils/src/main/java/org/apache/felix/utils/extender/AbstractExtender.java b/utils/src/main/java/org/apache/felix/utils/extender/AbstractExtender.java
new file mode 100644
index 0000000..22166a0
--- /dev/null
+++ b/utils/src/main/java/org/apache/felix/utils/extender/AbstractExtender.java
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2013 Guillaume Nodet.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ *
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.felix.utils.extender;
+
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleEvent;
+import org.osgi.framework.Constants;
+import org.osgi.framework.SynchronousBundleListener;
+import org.osgi.util.tracker.BundleTracker;
+import org.osgi.util.tracker.BundleTrackerCustomizer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class to write bundle extenders.
+ * This extender tracks started bundles (or starting if they have a lazy activation
+ * policy) and will create an {@link Extension} for each of them to manage it.
+ *
+ * The extender will handle all concurrency and synchronization issues, see
+ * {@link Extension} for more information about the additional constraints.
+ *
+ * The extender guarantee that all extensions will be stopped synchronously with
+ * the STOPPING event of a given bundle and that all extensions will be stopped
+ * before the extender bundle is stopped.
+ *
+ */
+public abstract class AbstractExtender implements BundleActivator, BundleTrackerCustomizer, SynchronousBundleListener {
+
+ private final ConcurrentMap<Bundle, Extension> extensions = new ConcurrentHashMap<Bundle, Extension>();
+ private final ConcurrentMap<Bundle, FutureTask> destroying = new ConcurrentHashMap<Bundle, FutureTask>();
+ private volatile boolean stopping;
+
+ private boolean synchronous;
+ private boolean preemptiveShutdown;
+ private BundleContext context;
+ private ExecutorService executors;
+ private BundleTracker tracker;
+
+ /**
+ * Check if the extender is synchronous or not.
+ * If the flag is set, the extender will start the extension synchronously
+ * with the bundle being tracked or started. Else, the starting of the
+ * extension will be delegated to a thread pool.
+ *
+ * @return if the extender is synchronous
+ */
+ public boolean isSynchronous() {
+ return synchronous;
+ }
+
+ /**
+ * Check if the extender performs a preemptive shutdown
+ * of all extensions when the framework is being stopped.
+ * The default behavior is to wait for the framework to stop
+ * the bundles and stop the extension at that time.
+ *
+ * @return if the extender use a preemptive shutdown
+ */
+ public boolean isPreemptiveShutdown() {
+ return preemptiveShutdown;
+ }
+
+ public BundleContext getBundleContext() {
+ return context;
+ }
+
+ public ExecutorService getExecutors() {
+ return executors;
+ }
+
+ public void setSynchronous(boolean synchronous) {
+ this.synchronous = synchronous;
+ }
+
+ public void setPreemptiveShutdown(boolean preemptiveShutdown) {
+ this.preemptiveShutdown = preemptiveShutdown;
+ }
+
+ public void start(BundleContext context) throws Exception {
+ this.context = context;
+ this.context.addBundleListener(this);
+ this.tracker = new BundleTracker(this.context, Bundle.ACTIVE | Bundle.STARTING, this);
+ if (!this.synchronous) {
+ this.executors = createExecutor();
+ }
+ doStart();
+ }
+
+ public void stop(BundleContext context) throws Exception {
+ stopping = true;
+ while (!extensions.isEmpty()) {
+ Collection<Bundle> toDestroy = chooseBundlesToDestroy(extensions.keySet());
+ if (toDestroy == null || toDestroy.isEmpty()) {
+ toDestroy = new ArrayList<Bundle>(extensions.keySet());
+ }
+ for (Bundle bundle : toDestroy) {
+ destroyExtension(bundle);
+ }
+ }
+ doStop();
+ if (executors != null) {
+ executors.shutdown();
+ try {
+ executors.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ executors = null;
+ }
+ }
+
+ protected void doStart() throws Exception {
+ startTracking();
+ }
+
+ protected void doStop() throws Exception {
+ stopTracking();
+ }
+
+ protected void startTracking() {
+ this.tracker.open();
+ }
+
+ protected void stopTracking() {
+ this.tracker.close();
+ }
+
+ /**
+ * Create the executor used to start extensions asynchronously.
+ *
+ * @return an
+ */
+ protected ExecutorService createExecutor() {
+ return Executors.newScheduledThreadPool(3);
+ }
+
+ /**
+ *
+ * @param bundles
+ * @return
+ */
+ protected Collection<Bundle> chooseBundlesToDestroy(Set<Bundle> bundles) {
+ return null;
+ }
+
+
+ public void bundleChanged(BundleEvent event) {
+ Bundle bundle = event.getBundle();
+ if (bundle.getState() != Bundle.ACTIVE && bundle.getState() != Bundle.STARTING) {
+ // The bundle is not in STARTING or ACTIVE state anymore
+ // so destroy the context. Ignore our own bundle since it
+ // needs to kick the orderly shutdown.
+ if (bundle != this.context.getBundle()) {
+ destroyExtension(bundle);
+ }
+ }
+ }
+
+ public Object addingBundle(Bundle bundle, BundleEvent event) {
+ modifiedBundle(bundle, event, bundle);
+ return bundle;
+ }
+
+ public void modifiedBundle(Bundle bundle, BundleEvent event, Object object) {
+ // If the bundle being stopped is the system bundle,
+ // do an orderly shutdown of all blueprint contexts now
+ // so that service usage can actually be useful
+ if (bundle.getBundleId() == 0 && bundle.getState() == Bundle.STOPPING) {
+ if (preemptiveShutdown) {
+ try {
+ stop(context);
+ } catch (Exception e) {
+ error("Error while performing preemptive shutdown", e);
+ }
+ return;
+ }
+ }
+ if (bundle.getState() != Bundle.ACTIVE && bundle.getState() != Bundle.STARTING) {
+ // The bundle is not in STARTING or ACTIVE state anymore
+ // so destroy the context. Ignore our own bundle since it
+ // needs to kick the orderly shutdown and not unregister the namespaces.
+ if (bundle != this.context.getBundle()) {
+ destroyExtension(bundle);
+ }
+ return;
+ }
+ // Do not track bundles given we are stopping
+ if (stopping) {
+ return;
+ }
+ // For starting bundles, ensure, it's a lazy activation,
+ // else we'll wait for the bundle to become ACTIVE
+ if (bundle.getState() == Bundle.STARTING) {
+ String activationPolicyHeader = (String) bundle.getHeaders().get(Constants.BUNDLE_ACTIVATIONPOLICY);
+ if (activationPolicyHeader == null || !activationPolicyHeader.startsWith(Constants.ACTIVATION_LAZY)) {
+ // Do not track this bundle yet
+ return;
+ }
+ }
+ createExtension(bundle);
+ }
+
+ public void removedBundle(Bundle bundle, BundleEvent event, Object object) {
+ // Nothing to do
+ destroyExtension(bundle);
+ }
+
+ private void createExtension(final Bundle bundle) {
+ try {
+ BundleContext bundleContext = bundle.getBundleContext();
+ if (bundleContext == null) {
+ // The bundle has been stopped in the mean time
+ return;
+ }
+ final Extension extension = doCreateExtension(bundle);
+ if (extension == null) {
+ // This bundle is not to be extended
+ return;
+ }
+ synchronized (extensions) {
+ if (extensions.putIfAbsent(bundle, extension) != null) {
+ return;
+ }
+ }
+ if (synchronous) {
+ debug(bundle, "Starting extension synchronously");
+ extension.start();
+ } else {
+ debug(bundle, "Scheduling asynchronous start of extension");
+ getExecutors().submit(new Runnable() {
+ public void run() {
+ try {
+ extension.start();
+ } catch (Exception e) {
+ warn(bundle, "Error starting extension", e);
+ }
+ }
+ });
+ }
+ } catch (Throwable t) {
+ warn(bundle, "Error while creating extension", t);
+ }
+ }
+
+ private void destroyExtension(final Bundle bundle) {
+ FutureTask future;
+ synchronized (extensions) {
+ debug(bundle, "Starting destruction process");
+ future = destroying.get(bundle);
+ if (future == null) {
+ final Extension extension = extensions.remove(bundle);
+ if (extension != null) {
+ debug(bundle, "Scheduling extension destruction");
+ future = new FutureTask<Void>(new Runnable() {
+ public void run() {
+ debug(bundle, "Destroying extension");
+ try {
+ extension.destroy();
+ } catch (Exception e) {
+ warn(bundle, "Error while destroying extension", e);
+ } finally {
+ debug(bundle, "Finished destroying extension");
+ synchronized (extensions) {
+ destroying.remove(bundle);
+ }
+ }
+ }
+ }, null);
+ destroying.put(bundle, future);
+ } else {
+ debug(bundle, "Not an extended bundle or destruction of extension already finished");
+ }
+ } else {
+ debug(bundle, "Destruction already scheduled");
+ }
+ }
+ if (future != null) {
+ try {
+ debug(bundle, "Waiting for extension destruction");
+ future.run();
+ future.get();
+ } catch (Throwable t) {
+ warn(bundle, "Error while destroying extension", t);
+ }
+ }
+ }
+
+ /**
+ * Create the extension for the given bundle, or null if the bundle is not to be extended.
+ *
+ * @param bundle the bundle to extend
+ * @return
+ * @throws Exception
+ */
+ protected abstract Extension doCreateExtension(Bundle bundle) throws Exception;
+
+ protected abstract void debug(Bundle bundle, String msg);
+ protected abstract void warn(Bundle bundle, String msg, Throwable t);
+ protected abstract void error(String msg, Throwable t);
+
+}
diff --git a/utils/src/main/java/org/apache/felix/utils/extender/Extension.java b/utils/src/main/java/org/apache/felix/utils/extender/Extension.java
new file mode 100644
index 0000000..24a818f
--- /dev/null
+++ b/utils/src/main/java/org/apache/felix/utils/extender/Extension.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2013 Guillaume Nodet.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ *
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.felix.utils.extender;
+
+/**
+ * A simple interface used by the extender to manage extensions.
+ */
+public interface Extension {
+
+ /**
+ * Start this extension. Starting and stopping of the extension
+ * should be synchronized.
+ */
+ void start() throws Exception;
+
+ /**
+ * Destroy should be synchronous and only return when the extension
+ * has been fully destroyed. In addition it must be synchronized with
+ * start, because start() and destroy() can be called concurrently.
+ */
+ void destroy() throws Exception;
+
+}
diff --git a/utils/src/main/java/org/apache/felix/utils/extender/SimpleExtension.java b/utils/src/main/java/org/apache/felix/utils/extender/SimpleExtension.java
new file mode 100644
index 0000000..e5ecc05
--- /dev/null
+++ b/utils/src/main/java/org/apache/felix/utils/extender/SimpleExtension.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2013 Guillaume Nodet.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ *
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.felix.utils.extender;
+
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class SimpleExtension implements Extension {
+
+ private final Bundle bundle;
+ private final BundleContext bundleContext;
+ private final AtomicBoolean destroyed = new AtomicBoolean(false);
+
+ public SimpleExtension(Bundle bundle) {
+ this.bundle = bundle;
+ this.bundleContext = bundle.getBundleContext();
+ }
+
+ public boolean isDestroyed() {
+ synchronized (getLock()) {
+ return destroyed.get();
+ }
+ }
+
+ public void start() throws Exception {
+ synchronized (getLock()) {
+ if (destroyed.get()) {
+ return;
+ }
+ if (bundle.getState() != Bundle.ACTIVE) {
+ return;
+ }
+ if (bundle.getBundleContext() != bundleContext) {
+ return;
+ }
+ doStart();
+ }
+ }
+
+ public void destroy() throws Exception {
+ synchronized (getLock()) {
+ destroyed.set(true);
+ }
+ doDestroy();
+ }
+
+ protected Object getLock() {
+ return this;
+ }
+
+ protected abstract void doStart() throws Exception;
+
+ protected abstract void doDestroy() throws Exception;
+
+}