[FELIX-3997] Provide an abstract bundle extender

git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1461027 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/utils/pom.xml b/utils/pom.xml
index 2c93ef7..4c0e2c8 100644
--- a/utils/pom.xml
+++ b/utils/pom.xml
@@ -26,7 +26,7 @@
   <modelVersion>4.0.0</modelVersion>
   <name>Apache Felix Utils</name>
   <description>Utility classes for OSGi.</description>
-  <version>1.2.1-SNAPSHOT</version>
+  <version>1.3.0-SNAPSHOT</version>
   <artifactId>org.apache.felix.utils</artifactId>
   <scm>
       <connection>scm:svn:http://svn.apache.org/repos/asf/felix/trunk/utils</connection>
@@ -42,7 +42,7 @@
     <dependency>
       <groupId>org.osgi</groupId>
       <artifactId>org.osgi.compendium</artifactId>
-      <version>4.1.0</version>
+      <version>4.2.0</version>
     </dependency>
   </dependencies>
   <build>
diff --git a/utils/src/main/java/org/apache/felix/utils/extender/AbstractExtender.java b/utils/src/main/java/org/apache/felix/utils/extender/AbstractExtender.java
new file mode 100644
index 0000000..22166a0
--- /dev/null
+++ b/utils/src/main/java/org/apache/felix/utils/extender/AbstractExtender.java
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2013 Guillaume Nodet.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ *
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.felix.utils.extender;
+
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleEvent;
+import org.osgi.framework.Constants;
+import org.osgi.framework.SynchronousBundleListener;
+import org.osgi.util.tracker.BundleTracker;
+import org.osgi.util.tracker.BundleTrackerCustomizer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class to write bundle extenders.
+ * This extender tracks started bundles (or starting if they have a lazy activation
+ * policy) and will create an {@link Extension} for each of them to manage it.
+ *
+ * The extender will handle all concurrency and synchronization issues, see
+ * {@link Extension} for more information about the additional constraints.
+ *
+ * The extender guarantee that all extensions will be stopped synchronously with
+ * the STOPPING event of a given bundle and that all extensions will be stopped
+ * before the extender bundle is stopped.
+ *
+ */
+public abstract class AbstractExtender implements BundleActivator, BundleTrackerCustomizer, SynchronousBundleListener {
+
+    private final ConcurrentMap<Bundle, Extension> extensions = new ConcurrentHashMap<Bundle, Extension>();
+    private final ConcurrentMap<Bundle, FutureTask> destroying = new ConcurrentHashMap<Bundle, FutureTask>();
+    private volatile boolean stopping;
+
+    private boolean synchronous;
+    private boolean preemptiveShutdown;
+    private BundleContext context;
+    private ExecutorService executors;
+    private BundleTracker tracker;
+
+    /**
+     * Check if the extender is synchronous or not.
+     * If the flag is set, the extender will start the extension synchronously
+     * with the bundle being tracked or started.  Else, the starting of the
+     * extension will be delegated to a thread pool.
+     *
+     * @return if the extender is synchronous
+     */
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    /**
+     * Check if the extender performs a preemptive shutdown
+     * of all extensions when the framework is being stopped.
+     * The default behavior is to wait for the framework to stop
+     * the bundles and stop the extension at that time.
+     *
+     * @return if the extender use a preemptive shutdown
+     */
+    public boolean isPreemptiveShutdown() {
+        return preemptiveShutdown;
+    }
+
+    public BundleContext getBundleContext() {
+        return context;
+    }
+
+    public ExecutorService getExecutors() {
+        return executors;
+    }
+
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
+    public void setPreemptiveShutdown(boolean preemptiveShutdown) {
+        this.preemptiveShutdown = preemptiveShutdown;
+    }
+
+    public void start(BundleContext context) throws Exception {
+        this.context = context;
+        this.context.addBundleListener(this);
+        this.tracker = new BundleTracker(this.context, Bundle.ACTIVE | Bundle.STARTING, this);
+        if (!this.synchronous) {
+            this.executors = createExecutor();
+        }
+        doStart();
+    }
+
+    public void stop(BundleContext context) throws Exception {
+        stopping = true;
+        while (!extensions.isEmpty()) {
+            Collection<Bundle> toDestroy = chooseBundlesToDestroy(extensions.keySet());
+            if (toDestroy == null || toDestroy.isEmpty()) {
+                toDestroy = new ArrayList<Bundle>(extensions.keySet());
+            }
+            for (Bundle bundle : toDestroy) {
+                destroyExtension(bundle);
+            }
+        }
+        doStop();
+        if (executors != null) {
+            executors.shutdown();
+            try {
+                executors.awaitTermination(60, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+            executors = null;
+        }
+    }
+
+    protected void doStart() throws Exception {
+        startTracking();
+    }
+
+    protected void doStop() throws Exception {
+        stopTracking();
+    }
+
+    protected void startTracking() {
+        this.tracker.open();
+    }
+
+    protected void stopTracking() {
+        this.tracker.close();
+    }
+
+    /**
+     * Create the executor used to start extensions asynchronously.
+     *
+     * @return an
+     */
+    protected ExecutorService createExecutor() {
+        return Executors.newScheduledThreadPool(3);
+    }
+
+    /**
+     *
+     * @param bundles
+     * @return
+     */
+    protected Collection<Bundle> chooseBundlesToDestroy(Set<Bundle> bundles) {
+        return null;
+    }
+
+
+    public void bundleChanged(BundleEvent event) {
+        Bundle bundle = event.getBundle();
+        if (bundle.getState() != Bundle.ACTIVE && bundle.getState() != Bundle.STARTING) {
+            // The bundle is not in STARTING or ACTIVE state anymore
+            // so destroy the context.  Ignore our own bundle since it
+            // needs to kick the orderly shutdown.
+            if (bundle != this.context.getBundle()) {
+                destroyExtension(bundle);
+            }
+        }
+    }
+
+    public Object addingBundle(Bundle bundle, BundleEvent event) {
+        modifiedBundle(bundle, event, bundle);
+        return bundle;
+    }
+
+    public void modifiedBundle(Bundle bundle, BundleEvent event, Object object) {
+        // If the bundle being stopped is the system bundle,
+        // do an orderly shutdown of all blueprint contexts now
+        // so that service usage can actually be useful
+        if (bundle.getBundleId() == 0 && bundle.getState() == Bundle.STOPPING) {
+            if (preemptiveShutdown) {
+                try {
+                    stop(context);
+                } catch (Exception e) {
+                    error("Error while performing preemptive shutdown", e);
+                }
+                return;
+            }
+        }
+        if (bundle.getState() != Bundle.ACTIVE && bundle.getState() != Bundle.STARTING) {
+            // The bundle is not in STARTING or ACTIVE state anymore
+            // so destroy the context.  Ignore our own bundle since it
+            // needs to kick the orderly shutdown and not unregister the namespaces.
+            if (bundle != this.context.getBundle()) {
+                destroyExtension(bundle);
+            }
+            return;
+        }
+        // Do not track bundles given we are stopping
+        if (stopping) {
+            return;
+        }
+        // For starting bundles, ensure, it's a lazy activation,
+        // else we'll wait for the bundle to become ACTIVE
+        if (bundle.getState() == Bundle.STARTING) {
+            String activationPolicyHeader = (String) bundle.getHeaders().get(Constants.BUNDLE_ACTIVATIONPOLICY);
+            if (activationPolicyHeader == null || !activationPolicyHeader.startsWith(Constants.ACTIVATION_LAZY)) {
+                // Do not track this bundle yet
+                return;
+            }
+        }
+        createExtension(bundle);
+    }
+
+    public void removedBundle(Bundle bundle, BundleEvent event, Object object) {
+        // Nothing to do
+        destroyExtension(bundle);
+    }
+
+    private void createExtension(final Bundle bundle) {
+        try {
+            BundleContext bundleContext = bundle.getBundleContext();
+            if (bundleContext == null) {
+                // The bundle has been stopped in the mean time
+                return;
+            }
+            final Extension extension = doCreateExtension(bundle);
+            if (extension == null) {
+                // This bundle is not to be extended
+                return;
+            }
+            synchronized (extensions) {
+                if (extensions.putIfAbsent(bundle, extension) != null) {
+                    return;
+                }
+            }
+            if (synchronous) {
+                debug(bundle, "Starting extension synchronously");
+                extension.start();
+            } else {
+                debug(bundle, "Scheduling asynchronous start of extension");
+                getExecutors().submit(new Runnable() {
+                    public void run() {
+                        try {
+                            extension.start();
+                        } catch (Exception e) {
+                            warn(bundle, "Error starting extension", e);
+                        }
+                    }
+                });
+            }
+        } catch (Throwable t) {
+            warn(bundle, "Error while creating extension", t);
+        }
+    }
+
+    private void destroyExtension(final Bundle bundle) {
+        FutureTask future;
+        synchronized (extensions) {
+            debug(bundle, "Starting destruction process");
+            future = destroying.get(bundle);
+            if (future == null) {
+                final Extension extension = extensions.remove(bundle);
+                if (extension != null) {
+                    debug(bundle, "Scheduling extension destruction");
+                    future = new FutureTask<Void>(new Runnable() {
+                        public void run() {
+                            debug(bundle, "Destroying extension");
+                            try {
+                                extension.destroy();
+                            } catch (Exception e) {
+                                warn(bundle, "Error while destroying extension", e);
+                            } finally {
+                                debug(bundle, "Finished destroying extension");
+                                synchronized (extensions) {
+                                    destroying.remove(bundle);
+                                }
+                            }
+                        }
+                    }, null);
+                    destroying.put(bundle, future);
+                } else {
+                    debug(bundle, "Not an extended bundle or destruction of extension already finished");
+                }
+            } else {
+                debug(bundle, "Destruction already scheduled");
+            }
+        }
+        if (future != null) {
+            try {
+                debug(bundle, "Waiting for extension destruction");
+                future.run();
+                future.get();
+            } catch (Throwable t) {
+                warn(bundle, "Error while destroying extension", t);
+            }
+        }
+    }
+
+    /**
+     * Create the extension for the given bundle, or null if the bundle is not to be extended.
+     *
+     * @param bundle the bundle to extend
+     * @return
+     * @throws Exception
+     */
+    protected abstract Extension doCreateExtension(Bundle bundle) throws Exception;
+
+    protected abstract void debug(Bundle bundle, String msg);
+    protected abstract void warn(Bundle bundle, String msg, Throwable t);
+    protected abstract void error(String msg, Throwable t);
+
+}
diff --git a/utils/src/main/java/org/apache/felix/utils/extender/Extension.java b/utils/src/main/java/org/apache/felix/utils/extender/Extension.java
new file mode 100644
index 0000000..24a818f
--- /dev/null
+++ b/utils/src/main/java/org/apache/felix/utils/extender/Extension.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2013 Guillaume Nodet.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ *
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.felix.utils.extender;
+
+/**
+ * A simple interface used by the extender to manage extensions.
+ */
+public interface Extension {
+
+    /**
+     * Start this extension. Starting and stopping of the extension
+     * should be synchronized.
+     */
+    void start() throws Exception;
+
+    /**
+     * Destroy should be synchronous and only return when the extension
+     * has been fully destroyed.  In addition it must be synchronized with
+     * start, because start() and destroy() can be called concurrently.
+     */
+    void destroy() throws Exception;
+
+}
diff --git a/utils/src/main/java/org/apache/felix/utils/extender/SimpleExtension.java b/utils/src/main/java/org/apache/felix/utils/extender/SimpleExtension.java
new file mode 100644
index 0000000..e5ecc05
--- /dev/null
+++ b/utils/src/main/java/org/apache/felix/utils/extender/SimpleExtension.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2013 Guillaume Nodet.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ *
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.felix.utils.extender;
+
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class SimpleExtension implements Extension {
+
+    private final Bundle bundle;
+    private final BundleContext bundleContext;
+    private final AtomicBoolean destroyed = new AtomicBoolean(false);
+
+    public SimpleExtension(Bundle bundle) {
+        this.bundle = bundle;
+        this.bundleContext = bundle.getBundleContext();
+    }
+
+    public boolean isDestroyed() {
+        synchronized (getLock()) {
+            return destroyed.get();
+        }
+    }
+
+    public void start() throws Exception {
+        synchronized (getLock()) {
+            if (destroyed.get()) {
+                return;
+            }
+            if (bundle.getState() != Bundle.ACTIVE) {
+                return;
+            }
+            if (bundle.getBundleContext() != bundleContext) {
+                return;
+            }
+            doStart();
+        }
+    }
+
+    public void destroy() throws Exception {
+        synchronized (getLock()) {
+            destroyed.set(true);
+        }
+        doDestroy();
+    }
+
+    protected Object getLock() {
+        return this;
+    }
+
+    protected abstract void doStart() throws Exception;
+
+    protected abstract void doDestroy() throws Exception;
+
+}