[FELIX-2280] To much code duplication in DefaultJDBCLock, OracleJDBCLock and MySQLJDBCLock.

Thanks Christian Müller for supplying this patch :)



git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@946719 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/karaf/main/src/main/java/org/apache/felix/karaf/main/DefaultJDBCLock.java b/karaf/main/src/main/java/org/apache/felix/karaf/main/DefaultJDBCLock.java
index 0e8505b..a331e1a 100644
--- a/karaf/main/src/main/java/org/apache/felix/karaf/main/DefaultJDBCLock.java
+++ b/karaf/main/src/main/java/org/apache/felix/karaf/main/DefaultJDBCLock.java
@@ -21,7 +21,9 @@
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Properties;
 import java.util.logging.Logger;
 
@@ -34,7 +36,8 @@
  */
 public class DefaultJDBCLock implements Lock {
 
-    private static final Logger LOG = Logger.getLogger(DefaultJDBCLock.class.getName());
+    final Logger LOG = Logger.getLogger(this.getClass().getName());
+    
     private static final String PROPERTY_LOCK_URL               = "karaf.lock.jdbc.url";
     private static final String PROPERTY_LOCK_JDBC_DRIVER       = "karaf.lock.jdbc.driver";
     private static final String PROPERTY_LOCK_JDBC_USER         = "karaf.lock.jdbc.user";
@@ -42,175 +45,259 @@
     private static final String PROPERTY_LOCK_JDBC_TABLE        = "karaf.lock.jdbc.table";
     private static final String PROPERTY_LOCK_JDBC_CLUSTERNAME  = "karaf.lock.jdbc.clustername";
     private static final String PROPERTY_LOCK_JDBC_TIMEOUT      = "karaf.lock.jdbc.timeout";
+    
+    private static final String DEFAULT_PASSWORD = "";
+    private static final String DEFAULT_USER = "";
+    private static final String DEFAULT_TABLE = "KARAF_LOCK";
+    private static final String DEFAULT_CLUSTERNAME = "karaf";
+    private static final String DEFAULT_TIMEOUT = "10"; // in seconds
 
-    private final Statements statements;
-    private Connection lockConnection;
-    private String url;
-    private String driver;
-    private String user; 
-    private String password;
-    private String table;
-    private String clusterName;
-    private int timeout;
+    final Statements statements;
+    Connection lockConnection;
+    String url;
+    String driver;
+    String user; 
+    String password;
+    String table;
+    String clusterName;
+    int timeout;
 
     public DefaultJDBCLock(Properties props) {
         LOG.addHandler(BootstrapLogManager.getDefaultHandler());
+        
         this.url = props.getProperty(PROPERTY_LOCK_URL);
         this.driver = props.getProperty(PROPERTY_LOCK_JDBC_DRIVER);
-        this.user = props.getProperty(PROPERTY_LOCK_JDBC_USER);
-        this.password = props.getProperty(PROPERTY_LOCK_JDBC_PASSWORD);
-        this.table = props.getProperty(PROPERTY_LOCK_JDBC_TABLE);
-        this.clusterName = props.getProperty(PROPERTY_LOCK_JDBC_CLUSTERNAME);
-        String time = props.getProperty(PROPERTY_LOCK_JDBC_TIMEOUT);
-        this.lockConnection = null;
-        if (table == null) { table = "KARAF_LOCK"; }
-        if ( clusterName == null) { clusterName = "karaf"; }
-        this.statements = new Statements(table, clusterName);
-        if (time != null) { 
-            this.timeout = Integer.parseInt(time) * 1000; 
-        } else {
-            this.timeout = 10000; // 10 seconds
-        }
-        if (user == null) { user = ""; }
-        if (password == null) { password = ""; }
+        this.user = props.getProperty(PROPERTY_LOCK_JDBC_USER, DEFAULT_USER);
+        this.password = props.getProperty(PROPERTY_LOCK_JDBC_PASSWORD, DEFAULT_PASSWORD);
+        this.table = props.getProperty(PROPERTY_LOCK_JDBC_TABLE, DEFAULT_TABLE);
+        this.clusterName = props.getProperty(PROPERTY_LOCK_JDBC_CLUSTERNAME, DEFAULT_CLUSTERNAME);
+        this.timeout = Integer.parseInt(props.getProperty(PROPERTY_LOCK_JDBC_TIMEOUT, DEFAULT_TIMEOUT));
+        
+        this.statements = createStatements();
+        
+        init();
     }
-
-    /**
-     * setUpdateCursor - Send Update directive to data base server.
-     *
-     * @throws Exception
-     */
-    private boolean setUpdateCursor() throws Exception {
-        PreparedStatement statement = null;
-        boolean result = false;
-        try { 
-            if ((lockConnection == null) || (lockConnection.isClosed())) { 
-                lockConnection = getConnection(driver, url, user, password);
-                lockConnection.setAutoCommit(false);
-                statements.init(lockConnection);
-            }
-            //statements.init(lockConnection);
-            String sql = statements.setUpdateCursor();
-            statement = lockConnection.prepareStatement(sql);
-            result = statement.execute();
+    
+    Statements createStatements() {
+        Statements statements = new Statements();
+        statements.setTableName(table);
+        statements.setNodeName(clusterName);
+        return statements;
+    }
+    
+    void init() {
+        try {
+            createDatabase();
+            createSchema();
         } catch (Exception e) {
-            LOG.warning("Could not obtain connection: " + e.getMessage());
-        } finally {
-            if (null != statement) {
-                try {
-                    LOG.severe("Cleaning up DB connection.");
-                    statement.close();
-                } catch (SQLException e1) {
-                    LOG.severe("Caught while closing statement: " + e1.getMessage());
-                }
-                statement = null;
-            }
+            LOG.severe("Error occured while attempting to obtain connection: " + e);
         }
-        LOG.info("Connected to data source: " + url);
-        return result;
+    }
+    
+    void createDatabase() {
+        // do nothing in the default implementation
     }
 
-    /**
-     * lock - a KeepAlive function to maintain lock. 
-     *
-     * @return true if connection lock retained, false otherwise.
+    void createSchema() {
+        if (schemaExists()) {
+            return;
+        }
+        
+        String[] createStatments = this.statements.getLockCreateSchemaStatements(getCurrentTimeMillis());
+        Statement statement = null;
+        
+        try {
+            statement = getConnection().createStatement();
+            
+            for (String stmt : createStatments) {
+                statement.execute(stmt);
+            }
+            
+            getConnection().commit();
+        } catch (Exception e) {
+            LOG.severe("Could not create schema: " + e );
+        } finally {
+            closeSafely(statement);
+        }
+    }
+
+    boolean schemaExists() {
+        ResultSet rs = null;
+        boolean schemaExists = false;
+        
+        try {
+            rs = getConnection().getMetaData().getTables(null, null, statements.getFullLockTableName(), new String[] {"TABLE"});
+            schemaExists = rs.next();
+        } catch (Exception ignore) {
+            LOG.severe("Error testing for db table: " + ignore);
+        } finally {
+            closeSafely(rs);
+        }
+        
+        return schemaExists;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.felix.karaf.main.Lock#lock()
      */
     public boolean lock() {
-        PreparedStatement statement = null;
-        boolean result = false;
+        boolean result = aquireLock();
+        
+        if (result) {
+            result = updateLock();
+        }
+        
+        return result;
+    }
+    
+    boolean aquireLock() {
+        String lockCreateStatement = statements.getLockCreateStatement();
+        PreparedStatement preparedStatement = null;
+        boolean lockAquired = false;
+        
         try {
-            if (!setUpdateCursor()) {
-                LOG.severe("Could not set DB update cursor");
-                return result;
-            }
-            long time = System.currentTimeMillis();
-            statement = lockConnection.prepareStatement(statements.getLockUpdateStatement(time));
-            int rows = statement.executeUpdate();
-            if (rows >= 1) {
-                result=true;
-            }
+            preparedStatement = getConnection().prepareStatement(lockCreateStatement);
+            preparedStatement.setQueryTimeout(timeout);
+            lockAquired = preparedStatement.execute();
         } catch (Exception e) {
-            LOG.warning("Failed to acquire database lock: " + e.getMessage());
+            LOG.warning("Failed to acquire database lock: " + e);
         }finally {
-            if (statement != null) {
+            closeSafely(preparedStatement);
+        }
+        
+        return lockAquired;
+    }
+
+    boolean updateLock() {
+        String lockUpdateStatement = statements.getLockUpdateStatement(getCurrentTimeMillis());
+        PreparedStatement preparedStatement = null;
+        boolean lockUpdated = false;
+        
+        try {
+            preparedStatement = getConnection().prepareStatement(lockUpdateStatement);
+            preparedStatement.setQueryTimeout(timeout);
+            int rows = preparedStatement.executeUpdate();
+            lockUpdated = (rows == 1);
+        } catch (Exception e) {
+            LOG.warning("Failed to update database lock: " + e);
+        }finally {
+            closeSafely(preparedStatement);
+        }
+        
+        return lockUpdated;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.felix.karaf.main.Lock#release()
+     */
+    public void release() throws Exception {
+        if (isConnected()) {
+            try {
+                getConnection().rollback();
+            } catch (SQLException e) {
+                LOG.severe("Exception while rollbacking the connection on release: " + e);
+            } finally {
                 try {
-                    statement.close();
-                } catch (SQLException e) {
-                    LOG.severe("Failed to close statement" + e);
+                    getConnection().close();
+                } catch (SQLException ignored) {
+                    LOG.fine("Exception while closing connection on release: " + ignored);
                 }
             }
         }
-        return result;
+        
+        lockConnection = null;
     }
 
-    /**
-     * release - terminate the lock connection safely.
-     */
-    public void release() throws Exception {
-        if (lockConnection != null && !lockConnection.isClosed()) {
-            lockConnection.rollback();
-            lockConnection.close();
-            lockConnection = null;
-        }
-    }
-
-    /**
-     * isAlive - test if lock still exists.
+    /*
+     * (non-Javadoc)
+     * @see org.apache.felix.karaf.main.Lock#isAlive()
      */
     public boolean isAlive() throws Exception {
-        if ((lockConnection == null) || (lockConnection.isClosed())) { 
+        if (!isConnected()) { 
             LOG.severe("Lost lock!");
             return false; 
         }
-        PreparedStatement statement = null;
-        boolean result = true;
-        try { 
-            long time = System.currentTimeMillis();
-            statement = lockConnection.prepareStatement(statements.getLockUpdateStatement(time));
-            int rows = statement.executeUpdate();
-            if (rows < 1) {
-                result = false;
-            }
-        } catch (Exception ex) {
-            LOG.severe("Error occured while testing lock: " + ex + " " + ex.getMessage());
-            return false;
-        } finally {
-            if (statement != null) {
-                try {
-                    statement.close();
-                } catch (Exception ex1) {
-                    LOG.severe("Error occured after testing lock: " + ex1.getMessage());
-                }
+
+        return updateLock();
+    }
+    
+    boolean isConnected() throws SQLException {
+        return lockConnection != null && !lockConnection.isClosed();
+    }
+    
+    void closeSafely(Statement preparedStatement) {
+        if (preparedStatement != null) {
+            try {
+                preparedStatement.close();
+            } catch (SQLException e) {
+                LOG.severe("Failed to close statement: " + e);
             }
         }
-        return result;
+    }
+    
+    void closeSafely(ResultSet rs) {
+        if (rs != null) {
+            try {
+                rs.close();
+            } catch (SQLException e) {
+                LOG.severe("Error occured while releasing ResultSet: " + e);
+            }
+        }
+    }
+    
+    Connection getConnection() throws Exception {
+        if (!isConnected()) {
+            lockConnection = createConnection(driver, url, user, password);
+            lockConnection.setAutoCommit(false);
+        }
+        
+        return lockConnection;
     }
 
     /**
-     * getConnection - Obtain connection to database via jdbc driver.
-     *
-     * @throws Exception
-     * @param driver, the JDBC driver class.
-     * @param url, url to data source.
-     * @param username, user to access data source.
-     * @param password, password for specified user.
-     * @return connection, null returned if conenction fails.
+     * Create a new jdbc connection.
+     * 
+     * @param driver
+     * @param url
+     * @param username
+     * @param password
+     * @return a new jdbc connection
+     * @throws Exception 
      */
-    private Connection getConnection(String driver, String url, 
-                                     String username, String password) throws Exception {
-        Connection conn = null;
+    Connection createConnection(String driver, String url, String username, String password) throws Exception {
+        if (url.toLowerCase().startsWith("jdbc:derby")) {
+            url = (url.toLowerCase().contains("create=true")) ? url : url + ";create=true";
+        }
+        
         try {
-            Class.forName(driver);
-            if (url.startsWith("jdbc:derby:")) {
-                conn = DriverManager.getConnection(url + ";create=true", username, password);
-            } else {
-                conn = DriverManager.getConnection(url, username, password);
-            }
+            return doCreateConnection(driver, url, username, password);
         } catch (Exception e) {
             LOG.severe("Error occured while setting up JDBC connection: " + e);
             throw e; 
         }
-        return conn;
     }
 
-}
+    /**
+     * This method could be used to inject a mock jdbc connection for testing purposes.
+     * 
+     * @param driver
+     * @param url
+     * @param username
+     * @param password
+     * @return
+     * @throws ClassNotFoundException
+     * @throws SQLException
+     */
+    Connection doCreateConnection(String driver, String url, String username, String password) throws ClassNotFoundException, SQLException {
+        Class.forName(driver);
+        // results in a closed connection in Derby if the update lock table request timed out
+        // DriverManager.setLoginTimeout(timeout);
+        return DriverManager.getConnection(url, username, password);
+    }
+    
+    long getCurrentTimeMillis() {
+        return System.currentTimeMillis();
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/main/java/org/apache/felix/karaf/main/DerbyJDBCLock.java b/karaf/main/src/main/java/org/apache/felix/karaf/main/DerbyJDBCLock.java
new file mode 100644
index 0000000..9d8f662
--- /dev/null
+++ b/karaf/main/src/main/java/org/apache/felix/karaf/main/DerbyJDBCLock.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+/**
+ * Represents an exclusive lock on a database,
+ * used to avoid multiple Karaf instances attempting
+ * to become master.
+ * 
+ * @version $Revision: $
+ */
+public class DerbyJDBCLock extends DefaultJDBCLock {
+
+    public DerbyJDBCLock(Properties props) {
+        super(props);
+    }
+
+    @Override
+    Connection createConnection(String driver, String url, String username, String password) throws Exception {
+        url = (url.toLowerCase().contains("create=true")) ? url : url + ";create=true";
+        
+        return super.createConnection(driver, url, username, password);
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/main/java/org/apache/felix/karaf/main/Lock.java b/karaf/main/src/main/java/org/apache/felix/karaf/main/Lock.java
index 02423a2..7750fe7 100644
--- a/karaf/main/src/main/java/org/apache/felix/karaf/main/Lock.java
+++ b/karaf/main/src/main/java/org/apache/felix/karaf/main/Lock.java
@@ -20,10 +20,27 @@
 
 public interface Lock {
 
+    /**
+     * A KeepAlive function to maintain the lock. 
+     * Indicates whether or not the lock could be aquired.
+     * 
+     * @return true if connection lock retained, false otherwise.
+     * @throws Exception
+     */
     boolean lock() throws Exception;
 
+    /**
+     * Terminate the lock connection safely.
+     * 
+     * @throws Exception
+     */
     void release() throws Exception;
 
+    /**
+     * Indicates whether or not the lock still exists.
+     * 
+     * @return true, if the lock still exists, otherwise false.
+     * @throws Exception
+     */
     boolean isAlive() throws Exception;
-
-}
+}
\ No newline at end of file
diff --git a/karaf/main/src/main/java/org/apache/felix/karaf/main/MySQLJDBCLock.java b/karaf/main/src/main/java/org/apache/felix/karaf/main/MySQLJDBCLock.java
index 5944929..86cc83f 100644
--- a/karaf/main/src/main/java/org/apache/felix/karaf/main/MySQLJDBCLock.java
+++ b/karaf/main/src/main/java/org/apache/felix/karaf/main/MySQLJDBCLock.java
@@ -19,11 +19,7 @@
 package org.apache.felix.karaf.main;
 
 import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
 import java.util.Properties;
-import java.util.logging.Logger;
 
 /**
  * Represents an exclusive lock on a database,
@@ -32,213 +28,33 @@
  * 
  * @version $Revision: $
  */
-public class MySQLJDBCLock implements Lock {
-
-    private static final Logger LOG = Logger.getLogger(MySQLJDBCLock.class.getName());
-    private static final String PROPERTY_LOCK_URL               = "karaf.lock.jdbc.url";
-    private static final String PROPERTY_LOCK_JDBC_DRIVER       = "karaf.lock.jdbc.driver";
-    private static final String PROPERTY_LOCK_JDBC_USER         = "karaf.lock.jdbc.user";
-    private static final String PROPERTY_LOCK_JDBC_PASSWORD     = "karaf.lock.jdbc.password";
-    private static final String PROPERTY_LOCK_JDBC_TABLE        = "karaf.lock.jdbc.table";
-    private static final String PROPERTY_LOCK_JDBC_CLUSTERNAME  = "karaf.lock.jdbc.clustername";
-    private static final String PROPERTY_LOCK_JDBC_TIMEOUT      = "karaf.lock.jdbc.timeout";
-
-    private final Statements statements;
-    private Connection lockConnection;
-    private String url;
-    private String database;
-    private String driver;
-    private String user; 
-    private String password;
-    private String table;
-    private String clusterName;
-    private int timeout;
+public class MySQLJDBCLock extends DefaultJDBCLock {
 
     public MySQLJDBCLock(Properties props) {
-        LOG.addHandler(BootstrapLogManager.getDefaultHandler());
-        this.url = props.getProperty(PROPERTY_LOCK_URL);
-        this.driver = props.getProperty(PROPERTY_LOCK_JDBC_DRIVER);
-        this.user = props.getProperty(PROPERTY_LOCK_JDBC_USER);
-        this.password = props.getProperty(PROPERTY_LOCK_JDBC_PASSWORD);
-        this.table = props.getProperty(PROPERTY_LOCK_JDBC_TABLE);
-        this.clusterName = props.getProperty(PROPERTY_LOCK_JDBC_CLUSTERNAME);
-        String time = props.getProperty(PROPERTY_LOCK_JDBC_TIMEOUT);
-        this.lockConnection = null;
-        if (table == null) { table = "KARAF_LOCK"; }
-        if ( clusterName == null) { clusterName = "karaf"; }
-        if (time != null) { 
-            this.timeout = Integer.parseInt(time) * 1000; 
-        } else {
-            this.timeout = 10000; // 10 seconds
-        }
-        if (user == null) { user = ""; }
-        if (password == null) { password = ""; }
-
-        int db = props.getProperty(PROPERTY_LOCK_URL).lastIndexOf("/");
-        this.url = props.getProperty(PROPERTY_LOCK_URL).substring(0, db);
-        this.database = props.getProperty(PROPERTY_LOCK_URL).substring(db +1);
-        this.statements = new Statements(database, table, clusterName);
-        testDB();
+        super(props);
     }
 
-    /**
-     * testDB - ensure specified database exists.
-     *
-     */
-    private void testDB() {
-        try {
-            lockConnection = getConnection(driver, url, user, password);
-            lockConnection.setAutoCommit(false);
-            statements.init(lockConnection, database);
-        } catch (Exception e) {
-            LOG.severe("Error occured while attempting to obtain connection: " + e + " " + e.getMessage());
-        } finally {
-            try {
-                lockConnection.close();
-                lockConnection = null;
-            } catch (Exception f) {
-                LOG.severe("Error occured while cleaning up connection: " + f + " " + f.getMessage());
+    Statements createStatements() {
+        Statements statements = new Statements();
+        statements.setTableName(table);
+        statements.setNodeName(clusterName);
+        String[] lockCreateSchemaStatements = statements.getLockCreateSchemaStatements(getCurrentTimeMillis());
+        for (int index = 0; index < lockCreateSchemaStatements.length; index++) {
+            if (lockCreateSchemaStatements[index].toUpperCase().startsWith("CREATE TABLE")) {
+                lockCreateSchemaStatements[index] = lockCreateSchemaStatements[index] + " ENGINE=INNODB";
             }
         }
+        return statements;
     }
-
-    /**
-     * setUpdateCursor - Send Update directive to data base server.
-     *
-     * @throws Exception
-     */
-    private boolean setUpdateCursor() throws Exception {
-        PreparedStatement statement = null;
-        boolean result = false;
-        try { 
-            if ((lockConnection == null) || (lockConnection.isClosed())) { 
-                LOG.fine("DefaultJDBCLock#setUpdateCursor:: connection: " + url + "/" + database );
-                lockConnection = getConnection(driver, url + "/" + database, user, password);
-                lockConnection.setAutoCommit(false);
-                statements.init(lockConnection);
-            } else {
-                LOG.fine("MySQLJDBCLock#setUpdateCursor:: connection already established.");
-                return true; 
-            }
-            String sql = "LOCK TABLES " + database + "." + table + " WRITE";
-            statement = lockConnection.prepareStatement(sql);
-            result = statement.execute();
-        } catch (Exception e) {
-            LOG.warning("Could not obtain connection: " + e.getMessage());
-        } finally {
-            if (null != statement) {
-                try {
-                    LOG.severe("Cleaning up DB connection.");
-                    statement.close();
-                } catch (SQLException e1) {
-                    LOG.severe("Caught while closing statement: " + e1.getMessage());
-                }
-                statement = null;
-            }
-        }
-        LOG.info("Connected to data source: " + url + " With RS: " + result);
-        return result;
+    
+    @Override
+    Connection createConnection(String driver, String url, String username, String password) throws Exception {
+        url = (url.toLowerCase().contains("createDatabaseIfNotExist=true")) ? 
+            url : 
+            ((url.contains("?")) ? 
+                url + "&createDatabaseIfNotExist=true" : 
+                url + "?createDatabaseIfNotExist=true");
+        
+        return super.createConnection(driver, url, username, password);
     }
-
-    /**
-     * lock - a KeepAlive function to maintain lock. 
-     *
-     * @return true if connection lock retained, false otherwise.
-     */
-    public boolean lock() {
-        PreparedStatement statement = null;
-        boolean result = false;
-        try {
-            if (!setUpdateCursor()) {
-                LOG.severe("Could not set DB update cursor");
-                return result;
-            }
-            LOG.fine("MySQLJDBCLock#lock:: have set Update Cursor, now do update");
-            long time = System.currentTimeMillis();
-            statement = lockConnection.prepareStatement(statements.getLockUpdateStatement(time));
-            int rows = statement.executeUpdate();
-            LOG.fine("MySQLJDBCLock#lock:: Number of update rows: " + rows);
-            if (rows >= 1) {
-                result=true;
-            }
-        } catch (Exception e) {
-            LOG.warning("Failed to acquire database lock: " + e.getMessage());
-        }finally {
-            if (statement != null) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                    LOG.severe("Failed to close statement" + e);
-                }
-            }
-        }
-        return result;
-    }
-
-    /**
-     * release - terminate the lock connection safely.
-     */
-    public void release() throws Exception {
-        if (lockConnection != null && !lockConnection.isClosed()) {
-            lockConnection.rollback();
-            lockConnection.close();
-            lockConnection = null;
-        }
-    }
-
-    /**
-     * isAlive - test if lock still exists.
-     */
-    public boolean isAlive() throws Exception {
-        if ((lockConnection == null) || (lockConnection.isClosed())) { 
-            LOG.severe("Lost lock!");
-            return false; 
-        }
-        PreparedStatement statement = null;
-        boolean result = true;
-        try { 
-            long time = System.currentTimeMillis();
-            statement = lockConnection.prepareStatement(statements.getLockUpdateStatement(time));
-            int rows = statement.executeUpdate();
-            if (rows < 1) {
-                result = false;
-            }
-        } catch (Exception ex) {
-            LOG.severe("Error occured while testing lock: " + ex + " " + ex.getMessage());
-            return false;
-        } finally {
-            if (statement != null) {
-                try {
-                    statement.close();
-                } catch (Exception ex1) {
-                    LOG.severe("Error occured after testing lock: " + ex1.getMessage());
-                }
-            }
-        }
-        return result;
-    }
-
-    /**
-     * getConnection - Obtain connection to database via jdbc driver.
-     *
-     * @throws Exception
-     * @param driver, the JDBC driver class.
-     * @param url, url to data source.
-     * @param username, user to access data source.
-     * @param password, password for specified user.
-     * @return connection, null returned if conenction fails.
-     */
-    private Connection getConnection(String driver, String url, 
-                                     String username, String password) throws Exception {
-        Connection conn = null;
-        try {
-            Class.forName(driver);
-            conn = DriverManager.getConnection(url, username, password);
-        } catch (Exception e) {
-            LOG.severe("Error occured while setting up JDBC connection: " + e);
-            throw e; 
-        }
-        return conn;
-    }
-
-}
+}
\ No newline at end of file
diff --git a/karaf/main/src/main/java/org/apache/felix/karaf/main/OracleJDBCLock.java b/karaf/main/src/main/java/org/apache/felix/karaf/main/OracleJDBCLock.java
index ff9b9cb..bcf72f7 100644
--- a/karaf/main/src/main/java/org/apache/felix/karaf/main/OracleJDBCLock.java
+++ b/karaf/main/src/main/java/org/apache/felix/karaf/main/OracleJDBCLock.java
@@ -18,12 +18,7 @@
  */
 package org.apache.felix.karaf.main;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
 import java.util.Properties;
-import java.util.logging.Logger;
 
 /**
  * Represents an exclusive lock on a database,
@@ -32,210 +27,44 @@
  * 
  * @version $Revision: $
  */
-public class OracleJDBCLock implements Lock {
-
-    private static final Logger LOG = Logger.getLogger(OracleJDBCLock.class.getName());
-    private static final String PROPERTY_LOCK_URL               = "karaf.lock.jdbc.url";
-    private static final String PROPERTY_LOCK_JDBC_DRIVER       = "karaf.lock.jdbc.driver";
-    private static final String PROPERTY_LOCK_JDBC_USER         = "karaf.lock.jdbc.user";
-    private static final String PROPERTY_LOCK_JDBC_PASSWORD     = "karaf.lock.jdbc.password";
-    private static final String PROPERTY_LOCK_JDBC_TABLE        = "karaf.lock.jdbc.table";
-    private static final String PROPERTY_LOCK_JDBC_CLUSTERNAME  = "karaf.lock.jdbc.clustername";
-    private static final String PROPERTY_LOCK_JDBC_TIMEOUT      = "karaf.lock.jdbc.timeout";
-
-    private final Statements statements;
-    private Connection lockConnection;
-    private String url;
-    private String database;
-    private String driver;
-    private String user; 
-    private String password;
-    private String table;
-    private String clusterName;
-    private int timeout;
+public class OracleJDBCLock extends DefaultJDBCLock {
+    
+    private static final String MOMENT_COLUMN_DATA_TYPE = "NUMBER(20)";
 
     public OracleJDBCLock(Properties props) {
-        LOG.addHandler(BootstrapLogManager.getDefaultHandler());
-        this.url = props.getProperty(PROPERTY_LOCK_URL);
-        this.driver = props.getProperty(PROPERTY_LOCK_JDBC_DRIVER);
-        this.user = props.getProperty(PROPERTY_LOCK_JDBC_USER);
-        this.password = props.getProperty(PROPERTY_LOCK_JDBC_PASSWORD);
-        this.table = props.getProperty(PROPERTY_LOCK_JDBC_TABLE);
-        this.clusterName = props.getProperty(PROPERTY_LOCK_JDBC_CLUSTERNAME);
-        String time = props.getProperty(PROPERTY_LOCK_JDBC_TIMEOUT);
-        this.lockConnection = null;
-        if (table == null) { table = "KARAF_LOCK"; }
-        if ( clusterName == null) { clusterName = "karaf"; }
-        if (time != null) { 
-            this.timeout = Integer.parseInt(time) * 1000; 
-        } else {
-            this.timeout = 10000; // 10 seconds
-        }
-        if (user == null) { user = ""; }
-        if (password == null) { password = ""; }
-
-        int db = props.getProperty(PROPERTY_LOCK_URL).lastIndexOf(":");
-        this.url = props.getProperty(PROPERTY_LOCK_URL);
-        this.database = props.getProperty(PROPERTY_LOCK_URL).substring(db +1);
-        this.statements = new Statements(database, table, clusterName);
-        statements.setDBCreateStatement("create database " + database);
-        statements.setCreateStatement("create table " + table + " (MOMENT number(20), NODE varchar2(20))");
-        statements.setPopulateStatement("insert into " + table + " (MOMENT, NODE) values ('1', '" + clusterName + "')");
-        statements.setColumnNames("MOMENT", "NODE");
-        testDB();
+        super(props);
     }
 
-    /**
-     * testDB - ensure specified database exists.
-     *
-     */
-    private void testDB() {
-        try {
-            lockConnection = getConnection(driver, url, user, password);
-            lockConnection.setAutoCommit(false);
-            statements.init(lockConnection);
-        } catch (Exception e) {
-            LOG.severe("Error occured while attempting to obtain connection: " + e + " " + e.getMessage());
-        } finally {
-            try {
-                lockConnection.close();
-                lockConnection = null;
-            } catch (Exception f) {
-                LOG.severe("Error occured while cleaning up connection: " + f + " " + f.getMessage());
-            }
-        }
+    @Override
+    Statements createStatements() {
+        Statements statements = new Statements();
+        statements.setTableName(table);
+        statements.setNodeName(clusterName);
+        statements.setMomentColumnDataType(MOMENT_COLUMN_DATA_TYPE);
+        return statements;
     }
-
+    
     /**
-     * setUpdateCursor - Send Update directive to data base server.
-     *
-     * @throws Exception
+     * When we perform an update on a long lived locked table, Oracle will save
+     * a copy of the transaction in it's UNDO table space. Eventually this can
+     * cause the UNDO table to become full, disrupting all locks in the DB instance.
+     * A select query just touches the table, ensuring we can still read the DB but
+     * doesn't add to the UNDO. 
      */
-    private boolean setUpdateCursor() throws Exception {
-        PreparedStatement statement = null;
-        boolean result = false;
-        try { 
-            if ((lockConnection == null) || (lockConnection.isClosed())) { 
-                LOG.fine("OracleJDBCLock#setUpdateCursor:: connection: " + url);
-                lockConnection = getConnection(driver, url, user, password);
-                lockConnection.setAutoCommit(false);
-                statements.init(lockConnection);
-            } else {
-                LOG.fine("OracleJDBCLock#setUpdateCursor:: connection already established.");
-                return true; 
-            }
-            String sql = "SELECT * FROM " + table + " FOR UPDATE NOWAIT";
-            statement = lockConnection.prepareStatement(sql);
-            result = statement.execute();
-        } catch (Exception e) {
-            LOG.warning("Could not obtain connection: " + e.getMessage());
-            lockConnection.close();
-            lockConnection = null;
-        } finally {
-            if (null != statement) {
-                try {
-                    LOG.severe("Cleaning up DB connection.");
-                    statement.close();
-                } catch (SQLException e1) {
-                    LOG.severe("Caught while closing statement: " + e1.getMessage());
-                }
-                statement = null;
-            }
-        }
-        LOG.fine("Connected to data source: " + url + " With RS: " + result);
-        return result;
-    }
-
-    /**
-     * lock - a KeepAlive function to maintain lock. 
-     *
-     * @return true if connection lock retained, false otherwise.
-     */
+    @Override
     public boolean lock() {
-        PreparedStatement statement = null;
-        boolean result = false;
-        try {
-            if (!setUpdateCursor()) {
-                LOG.severe("Could not set DB update cursor");
-                return result;
-            }
-            LOG.fine("OracleJDBCLock#lock:: have set Update Cursor, now perform query");
-            String up = "SELECT * FROM " + table;
-            statement = lockConnection.prepareStatement(up);
-            return statement.execute();
-        } catch (Exception e) {
-            LOG.warning("Failed to acquire database lock: " + e.getMessage());
-        }finally {
-            if (statement != null) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                    LOG.severe("Failed to close statement" + e);
-                }
-            }
-        }
-        return result;
+        return aquireLock();
     }
-
+    
     /**
-     * release - terminate the lock connection safely.
+     * When we perform an update on a long lived locked table, Oracle will save
+     * a copy of the transaction in it's UNDO table space. Eventually this can
+     * cause the UNDO table to become full, disrupting all locks in the DB instance.
+     * A select query just touches the table, ensuring we can still read the DB but
+     * doesn't add to the UNDO. 
      */
-    public void release() throws Exception {
-        if (lockConnection != null && !lockConnection.isClosed()) {
-            lockConnection.rollback();
-            lockConnection.close();
-            lockConnection = null;
-        }
+    @Override
+    boolean updateLock() {
+        return aquireLock();
     }
-
-    /**
-     * isAlive - test if lock still exists.
-     */
-    public boolean isAlive() throws Exception {
-        if ((lockConnection == null) || (lockConnection.isClosed())) { 
-            LOG.severe("Lost lock!");
-            return false; 
-        }
-        PreparedStatement statement = null;
-        try {
-            String up = "SELECT * FROM " + table;
-            statement = lockConnection.prepareStatement(up);
-            return statement.execute();
-        } catch (Exception e) {
-            LOG.warning("Failed to access database. " + e.getMessage());
-        } finally {
-            if (statement != null) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                    LOG.severe("Failed to close statement" + e);
-                }
-            }
-        }
-        return false;
-    }
-
-    /**
-     * getConnection - Obtain connection to database via jdbc driver.
-     *
-     * @throws Exception
-     * @param driver, the JDBC driver class.
-     * @param url, url to data source.
-     * @param username, user to access data source.
-     * @param password, password for specified user.
-     * @return connection, null returned if conenction fails.
-     */
-    private Connection getConnection(String driver, String url, 
-                                     String username, String password) throws Exception {
-        Connection conn = null;
-        try {
-            Class.forName(driver);
-            conn = DriverManager.getConnection(url, username, password);
-        } catch (Exception e) {
-            LOG.severe("Error occured while setting up JDBC connection: " + e);
-            throw e; 
-        }
-        return conn;
-    }
-
-}
+}
\ No newline at end of file
diff --git a/karaf/main/src/main/java/org/apache/felix/karaf/main/Statements.java b/karaf/main/src/main/java/org/apache/felix/karaf/main/Statements.java
index 44dbc09..251d6c4 100644
--- a/karaf/main/src/main/java/org/apache/felix/karaf/main/Statements.java
+++ b/karaf/main/src/main/java/org/apache/felix/karaf/main/Statements.java
@@ -18,149 +18,100 @@
  */
 package org.apache.felix.karaf.main;
 
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.logging.Logger;
 
 public class Statements {
-
-    private static Logger LOG = Logger.getLogger(Statements.class.getName());
-    private String lockTableName = "KARAF_LOCK";
-    private String clusterName = "karaf";
-    private String dbName = "sample";
-    private String time = "TIME";
-    private String cluster = "CLUSTER";
+    
+    protected String tablePrefix = "";
+    protected String tableName = "KARAF_LOCK";
+    protected String nodeName = "karaf";
+    protected String momentColumnDataType = "BIGINT";
+    protected String nodeColumnDataType = "VARCHAR(20)";
+    
+    private String[] lockCreateSchemaStatements;
     private String lockCreateStatement;
-    private String lockDBCreateStatement;
-    private String lockPopulateStatement;
-
-    public Statements(String tableName, String clusterName) {
-        LOG.addHandler( BootstrapLogManager.getDefaultHandler() );
-        
-        this.lockTableName = tableName; 
-        this.clusterName = clusterName;
-        this.lockCreateStatement="create table " + lockTableName + " (TIME bigint, CLUSTER varchar(20))";
-        this.lockPopulateStatement="insert into " + lockTableName + " (TIME, CLUSTER) values (1, '" + clusterName + "')";
+    private String lockUpdateStatement;
+    
+    public String[] getLockCreateSchemaStatements(long moment) {
+        if (lockCreateSchemaStatements == null) {
+            lockCreateSchemaStatements = new String[] {
+                "CREATE TABLE " + getFullLockTableName() + " (MOMENT " + getMomentColumnDataType() + ", NODE " + getNodeColumnDataType() + ")",
+                "INSERT INTO " + getFullLockTableName() + " (MOMENT, NODE) VALUES (" + moment + ", '" + getNodeName() + "')", 
+            };
+        }
+        return lockCreateSchemaStatements;
     }
-
-    public Statements(String dbName, String tableName, String clusterName) {
-        LOG.addHandler( BootstrapLogManager.getDefaultHandler() );
-        
-        this.dbName = dbName;
-        this.lockTableName = tableName;
-        this.clusterName = clusterName;
-        this.lockDBCreateStatement="create database if not exists " + dbName;
-        this.lockCreateStatement="create table " + lockTableName + " (TIME bigint, CLUSTER varchar(20)) ENGINE = INNODB";
-        this.lockPopulateStatement="insert into " + lockTableName + " (TIME, CLUSTER) values (1, '" + clusterName + "')";
+    
+    public void setLockCreateSchemaStatements(String[] lockCreateSchemaStatements) {
+        this.lockCreateSchemaStatements = lockCreateSchemaStatements;
     }
-
-    public void setDBCreateStatement(String createDB) {
-        this.lockDBCreateStatement = createDB;
+    
+    public String getLockCreateStatement() {
+        if (lockCreateStatement == null) {
+            lockCreateStatement = "SELECT * FROM " + getFullLockTableName() + " FOR UPDATE";
+        }
+        return lockCreateStatement;
     }
-
-    public void setCreateStatement(String createTable) {
-        this.lockCreateStatement = createTable;
+    
+    public void setLockCreateStatement(String lockCreateStatement) {
+        this.lockCreateStatement = lockCreateStatement;
     }
-
-    public void setPopulateStatement(String popTable) {
-        this.lockPopulateStatement = popTable;
-    }
- 
-    public void setColumnNames(String time, String cluster) {
-        this.time = time;
-        this.cluster = cluster; 
-    } 
-
-    public String setUpdateCursor() {
-        String test = "SELECT * FROM " + lockTableName + " FOR UPDATE";
-        return test;
-    }
-
-    public String getLockUpdateStatement(long timeStamp) {
-        String lockUpdateStatement = "";
-        lockUpdateStatement = "UPDATE " + lockTableName + 
-                              " SET " + this.time + "=" + timeStamp + 
-                              " WHERE " + this.cluster + " = '" + clusterName + "'";
+    
+    public String getLockUpdateStatement(long moment) {
+        if (lockUpdateStatement == null) {
+            lockUpdateStatement = "UPDATE " + getFullLockTableName() + " SET MOMENT = " + moment;
+        }
         return lockUpdateStatement;
     }
-
-    public void init (Connection lockConnection, String dbName) {
-        Statement s = null;
-        try {
-            s = lockConnection.createStatement();
-            s.execute(lockDBCreateStatement);
-        } catch (SQLException e) {
-            LOG.severe("SQL Exception: " + e +
-                      " " + e.getMessage());
-        } catch (Exception ignore) {
-            LOG.severe("Could not create database: " + ignore +
-                      " " + ignore.getMessage());
-        } finally {
-            try {
-                s.close();
-            } catch (Throwable e) {
-                // ignore
-            }
-        }
+    
+    public void setLockUpdateStatement(String lockUpdateStatement) {
+        this.lockUpdateStatement = lockUpdateStatement;
     }
 
-    /**
-     * init - initialize db
-     */
-    public void init (Connection lockConnection) {
-        Statement s = null;
-        try {
-            // Check to see if the table already exists. If it does, then don't
-            // log warnings during startup.
-            // Need to run the scripts anyways since they may contain ALTER
-            // statements that upgrade a previous version
-            // of the table
-            boolean alreadyExists = false;
-            ResultSet rs = null;
-            try {
-                rs = lockConnection.getMetaData().getTables(null, null, lockTableName, new String[] {"TABLE"});
-                alreadyExists = rs.next();
-            } catch (Throwable ignore) {
-                LOG.severe("Error testing for db table: " + ignore);
-            } finally {
-                close(rs);
-            }
-            if (alreadyExists) {
-                return;
-            }
-            s = lockConnection.createStatement();
-            String[] createStatments = {lockCreateStatement, lockPopulateStatement};
-            for (int i = 0; i < createStatments.length; i++) {
-                // This will fail usually since the tables will be
-                // created already.
-                try {
-                    s.execute(createStatments[i]);
-                } catch (SQLException e) {
-                    LOG.severe("Could not create JDBC tables; they could already exist."
-                             + " Failure was: " + createStatments[i] + " Message: " + e.getMessage()
-                             + " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode());
-                }
-            }
-            lockConnection.commit();
-        } catch (Exception ignore) {
-            LOG.severe("Error occured during initialization: " + ignore);
-        } finally {
-            try {
-                if (s != null) { s.close(); }
-            } catch (Throwable e) {
-                LOG.severe("Error occured while closing connection: " + e);
-            }
-        }
+    long getCurrentTimeMillis() {
+        return System.currentTimeMillis();
     }
 
-    private static void close(ResultSet rs) {
-        try {
-            rs.close();
-        } catch (Throwable e) {
-            LOG.severe("Error occured while releasing ResultSet: " + e);
-        }
+    public String getFullLockTableName() {
+        return getTablePrefix() + getTableName();
+    }
+    
+    public void setMomentColumnDataType(String momentColumnDataType) {
+        this.momentColumnDataType = momentColumnDataType;
+    }
+    
+    public String getMomentColumnDataType() {
+        return momentColumnDataType;
     }
 
-}
+    public String getNodeName() {
+        return nodeName;
+    }
+
+    public void setNodeName(String nodeName) {
+        this.nodeName = nodeName;
+    }
+
+    public String getNodeColumnDataType() {
+        return nodeColumnDataType;
+    }
+
+    public void setNodeColumnDataType(String nodeColumnDataType) {
+        this.nodeColumnDataType = nodeColumnDataType;
+    }
+
+    public String getTablePrefix() {
+        return tablePrefix;
+    }
+
+    public void setTablePrefix(String tablePrefix) {
+        this.tablePrefix = tablePrefix;
+    }
+    
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/test/java/org/apache/felix/karaf/main/BaseJDBCLockIntegrationTest.java b/karaf/main/src/test/java/org/apache/felix/karaf/main/BaseJDBCLockIntegrationTest.java
new file mode 100644
index 0000000..eae8d19
--- /dev/null
+++ b/karaf/main/src/test/java/org/apache/felix/karaf/main/BaseJDBCLockIntegrationTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public abstract class BaseJDBCLockIntegrationTest {
+    
+    private final Logger LOG = Logger.getLogger(this.getClass().getName());
+
+    DefaultJDBCLock lock;
+    Properties props;
+    String user = "root";
+    String password = "";
+    String driver;
+    String url;
+    String tableName = "LOCK_TABLE";
+    String clustername = "karaf_cluster";
+    int timeout = 10;
+    String momentDatatype = "BIGINT";
+    String nodeDatatype = "VARCHAR(20)";
+
+    abstract DefaultJDBCLock createLock(Properties props);
+    
+    @BeforeClass
+    public static void setUpTestSuite() {
+        Properties properties = new Properties();
+        properties.put("karaf.bootstrap.log", "target/karaf.log");
+        BootstrapLogManager.setProperties(properties);        
+    }
+    
+    @Before
+    public void setUp() throws Exception {
+        props = new Properties();
+        props.put("karaf.lock.jdbc.url", url);
+        props.put("karaf.lock.jdbc.driver", driver);
+        props.put("karaf.lock.jdbc.user", user);
+        props.put("karaf.lock.jdbc.password", password);
+        props.put("karaf.lock.jdbc.table", tableName);
+        props.put("karaf.lock.jdbc.clustername", clustername);
+        props.put("karaf.lock.jdbc.timeout", String.valueOf(timeout));
+        
+        try {
+            executeStatement("DROP TABLE " + tableName);
+        } catch (Exception e) {
+            // expected if the table dosn't exist
+        }
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        if (lock != null) {
+            lock.release();
+        }
+    }
+    
+    @Test
+    @Ignore
+    public void lockShouldRestoreTheLockAfterADbFailure() throws Exception {
+        Lock lock1 = createLock(props);
+        assertTrue(lock1.lock());
+        assertTrue(lock1.isAlive());
+        
+        // shut down the database
+        
+        assertFalse(lock1.isAlive());
+        
+        // start the database
+        
+        assertTrue(lock1.lock());
+        assertTrue(lock1.isAlive());
+    }
+    
+    @Test
+    public void initShouldCreateTheSchemaIfItNotExists() throws Exception {
+        long start = System.currentTimeMillis();
+        lock = createLock(props);
+        long end = System.currentTimeMillis();
+        
+        long moment = queryDatabaseSingleResult("SELECT MOMENT FROM " + tableName);
+    
+        assertTrue(moment >= start);
+        assertTrue(moment <= end);
+    }
+
+    @Test
+    public void initShouldNotCreateTheSchemaIfItAlreadyExists() throws Exception {
+        executeStatement("CREATE TABLE " + tableName + " (MOMENT " + momentDatatype + ", NODE " + nodeDatatype + ")");
+        executeStatement("INSERT INTO " + tableName + " (MOMENT, NODE) VALUES (1, '" + clustername + "')");
+        
+        lock = createLock(props);
+        
+        long moment = queryDatabaseSingleResult("SELECT MOMENT FROM " + tableName);
+    
+        assertEquals(1, moment);
+    }
+
+    @Test
+    public void lockShouldReturnTrueItTheTableIsNotLocked() throws Exception {
+        lock = createLock(props);
+        
+        assertTrue(lock.lock());
+        assertTableIsLocked();
+    }
+
+    @Test
+    public void lockShouldReturnFalseIfAnotherRowIsLocked() throws Exception {
+        Connection connection = null;
+        try {
+            lock = createLock(props);
+            
+            executeStatement("INSERT INTO " + tableName + " (MOMENT, NODE) VALUES (1, '" + clustername + "_2')");
+            connection = lock(tableName, clustername + "_2");
+            
+            // we can't lock only one row for the cluster
+            assertFalse(lock.lock());
+        } finally {
+            close(connection);
+        }
+    }
+    
+    @Test
+    public void lockShouldReturnFalseIfTheRowIsAlreadyLocked() throws Exception {
+        Connection connection = null;
+        try {
+            lock = createLock(props);
+            connection = lock(tableName, clustername);
+            
+            assertFalse(lock.lock());
+        } finally {
+            close(connection);
+        }
+    }
+    
+    @Test
+    public void release() throws Exception {
+        lock = createLock(props);
+        
+        assertTrue(lock.lock());
+        
+        lock.release();
+        
+        assertNull(lock.lockConnection);
+        assertTableIsUnlocked();
+    }
+
+    @Test
+    public void releaseShouldSucceedForAnAlreadyClosedConnection() throws Exception {
+        lock = createLock(props);
+        
+        assertTrue(lock.lock());
+        
+        lock.lockConnection.rollback(); // release the lock
+        lock.lockConnection.close();
+        lock.release();
+        
+        assertTableIsUnlocked();
+    }
+
+    @Test
+    public void releaseShouldSucceedForANullConnectionReference() throws Exception {
+        lock = createLock(props);
+        
+        assertTrue(lock.lock());
+        
+        lock.lockConnection.rollback(); // release the lock
+        lock.lockConnection.close();
+        lock.lockConnection = null;
+        lock.release();
+        
+        assertTableIsUnlocked();
+    }
+
+    @Test
+    public void isAliveShouldReturnTrueIfItHoldsTheLock() throws Exception {
+        lock = createLock(props);
+        
+        assertTrue(lock.lock());
+        assertTrue(lock.isAlive());
+    }
+
+    @Test
+    public void isAliveShouldReturnFalseIfTheConnectionIsClosed() throws Exception {
+        lock = createLock(props);
+        
+        assertTrue(lock.lock());
+        
+        lock.lockConnection.rollback(); // release the lock
+        lock.lockConnection.close();
+        
+        assertFalse(lock.isAlive());
+    }
+
+    @Test
+    public void isAliveShouldReturnFalseIfTheConnectionIsNull() throws Exception {
+        lock = createLock(props);
+        
+        assertTrue(lock.lock());
+        
+        lock.lockConnection.rollback(); // release the lock
+        lock.lockConnection.close();
+        lock.lockConnection = null;
+        
+        assertFalse(lock.isAlive());
+    }
+
+    @Test
+    public void isAliveShouldReturnFalseIfItNotHoldsTheLock() throws Exception {
+        Connection connection = null;
+        try {
+            lock = createLock(props);
+            
+            assertTrue(lock.lock());
+            
+            lock.lockConnection.rollback(); // release the lock
+            connection = lock(tableName, clustername); // another connection locks the table
+            
+            assertFalse(lock.isAlive());
+        } finally {
+            close(connection);            
+        }
+    }
+
+    Connection getConnection(String url, String user, String password) throws ClassNotFoundException, SQLException {
+        Class.forName(driver);
+        Connection connection = DriverManager.getConnection(url, user, password);
+        connection.setAutoCommit(false);
+        return connection;
+    }
+
+    void executeStatement(String stmt) throws SQLException, ClassNotFoundException {
+        Connection connection = null;
+        Statement statement = null;
+        
+        try {
+            connection = getConnection(url, user, password);
+            statement = connection.createStatement();
+            statement.setQueryTimeout(timeout);
+            statement.execute(stmt);    
+            connection.commit();
+        } finally {
+            close(statement);
+            close(connection);
+        }
+    }
+
+    Long queryDatabaseSingleResult(String query) throws ClassNotFoundException, SQLException {
+        Connection connection = null;
+        Statement statement = null;
+        ResultSet rs = null;
+        
+        try {
+            connection = getConnection(url, user, password);
+            statement = connection.createStatement();
+            rs = statement.executeQuery(query);
+            rs.next();
+            return rs.getLong(1);
+        } finally {
+            close(rs);
+            close(statement);
+            close(connection);
+        }
+    }
+
+    void assertTableIsLocked() throws ClassNotFoundException, SQLException {
+        try {
+            executeStatement("UPDATE " + tableName + " SET MOMENT = " + System.currentTimeMillis());
+            fail("SQLException for timeout expected because the table should be already locked");
+        } catch (SQLException sqle) {
+            // expected
+        }
+    }
+
+    void assertTableIsUnlocked() throws ClassNotFoundException, SQLException {
+        executeStatement("UPDATE " + tableName + " SET MOMENT = " + System.currentTimeMillis());
+    }
+
+    Connection lock(String table, String node) throws ClassNotFoundException, SQLException {
+        Connection connection = null;
+        Statement statement = null;
+        
+        try {
+            connection = getConnection(url, user, password);
+            statement = connection.createStatement();
+            //statement.execute("SELECT * FROM " + table + " WHERE NODE = '" + node + "' FOR UPDATE");
+            //statement.execute("UPDATE " + table + " SET MOMENT = " + System.currentTimeMillis() + " WHERE NODE = '" + node + "'");
+            statement.execute("SELECT * FROM " + table + " FOR UPDATE");
+            statement.execute("UPDATE " + table + " SET MOMENT = " + System.currentTimeMillis());
+        } finally {
+            close(statement);
+            // connection must not be closed!
+        }
+        
+        return connection;
+    }
+
+    void close(ResultSet rs) throws SQLException {
+        if (rs != null) {
+            try {
+                rs.close();
+            } catch (Exception e) {
+            }
+        }
+    }
+
+    void close(Statement statement) throws SQLException {
+        if (statement != null) {
+            try {
+                statement.close();
+            } catch (Exception e) {
+                LOG.severe("Can't close the statement: " + e);
+            }
+        }
+    }
+
+    void close(Connection connection) throws SQLException {
+        if (connection != null) {
+            try {
+                connection.rollback();
+            } catch (Exception e) {
+                LOG.severe("Can't rollback the connection: " + e);
+            }
+            try {
+                connection.close();
+            } catch (Exception e) {
+                LOG.severe("Can't close the connection: " + e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/test/java/org/apache/felix/karaf/main/BaseJDBCLockTest.java b/karaf/main/src/test/java/org/apache/felix/karaf/main/BaseJDBCLockTest.java
new file mode 100644
index 0000000..8f9de04
--- /dev/null
+++ b/karaf/main/src/test/java/org/apache/felix/karaf/main/BaseJDBCLockTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public abstract class BaseJDBCLockTest {
+    
+    DefaultJDBCLock lock;
+    Properties props;
+    String user = "root";
+    String password = "";
+    String driver;
+    String url;
+    String tableName = "LOCK_TABLE";
+    String clustername = "karaf_cluster";
+    int timeout = 10;
+    String momentDatatype = "BIGINT";
+    String nodeDatatype = "VARCHAR(20)";
+    String createTableStmtSuffix = "";
+    
+    Connection connection;
+    DatabaseMetaData metaData;
+    ResultSet resultSet;
+    PreparedStatement preparedStatement;
+    Statement statement;
+    
+    abstract DefaultJDBCLock createLock(Properties props);
+    
+    @BeforeClass
+    public static void setUpTestSuite() {
+        Properties properties = new Properties();
+        properties.put("karaf.bootstrap.log", "target/karaf.log");
+        BootstrapLogManager.setProperties(properties);        
+    }
+    
+    @Before
+    public void setUp() throws Exception {
+        connection = EasyMock.createMock(Connection.class);
+        metaData = EasyMock.createMock(DatabaseMetaData.class);
+        resultSet = EasyMock.createMock(ResultSet.class);
+        preparedStatement = EasyMock.createMock(PreparedStatement.class);
+        statement = EasyMock.createMock(Statement.class);
+        
+        props = new Properties();
+        props.put("karaf.lock.jdbc.url", url);
+        props.put("karaf.lock.jdbc.driver", driver);
+        props.put("karaf.lock.jdbc.user", user);
+        props.put("karaf.lock.jdbc.password", password);
+        props.put("karaf.lock.jdbc.table", tableName);
+        props.put("karaf.lock.jdbc.clustername", clustername);
+        props.put("karaf.lock.jdbc.timeout", timeout);
+    }
+    
+    @Test
+    public void initShouldCreateTheSchemaIfItNotExists() throws Exception {
+        expect(connection.isClosed()).andReturn(false);
+        connection.setAutoCommit(false);
+        expect(connection.getMetaData()).andReturn(metaData);
+        expect(metaData.getTables((String) isNull(), (String) isNull(), eq("LOCK_TABLE"), aryEq(new String[] {"TABLE"}))).andReturn(resultSet);
+        expect(resultSet.next()).andReturn(false);
+        resultSet.close();
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.createStatement()).andReturn(statement);
+        expect(statement.execute("CREATE TABLE " + tableName + " (MOMENT " + momentDatatype + ", NODE " + nodeDatatype + ")" + createTableStmtSuffix)).andReturn(false);
+        expect(statement.execute("INSERT INTO " + tableName + " (MOMENT, NODE) VALUES (1, '" + clustername + "')")).andReturn(false);
+        statement.close();
+        connection.commit();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        lock = createLock(props);
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+    }
+    
+    @Test
+    public void initShouldNotCreateTheSchemaIfItAlreadyExists() throws Exception {
+        connection.setAutoCommit(false);
+        expect(connection.getMetaData()).andReturn(metaData);
+        expect(metaData.getTables((String) isNull(), (String) isNull(), eq("LOCK_TABLE"), aryEq(new String[] {"TABLE"}))).andReturn(resultSet);
+        expect(resultSet.next()).andReturn(true);
+        resultSet.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        lock = createLock(props);
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+    }
+    
+    @Test
+    public void lockShouldReturnTrueItTheTableIsNotLocked() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("SELECT * FROM " + tableName + " FOR UPDATE")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.execute()).andReturn(true);
+        preparedStatement.close();
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("UPDATE " + tableName + " SET MOMENT = 1")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.executeUpdate()).andReturn(1);
+        preparedStatement.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean lockAquired = lock.lock();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertTrue(lockAquired);
+    }
+    
+    @Test
+    public void lockShouldReturnFalseIfAnotherRowIsLocked() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("SELECT * FROM " + tableName + " FOR UPDATE")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.execute()).andReturn(true);
+        preparedStatement.close();
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("UPDATE " + tableName + " SET MOMENT = 1")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.executeUpdate()).andThrow(new SQLException());
+        preparedStatement.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean lockAquired = lock.lock();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertFalse(lockAquired);
+    }
+    
+    @Test
+    public void lockShouldReturnFalseIfTheRowIsAlreadyLocked() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("SELECT * FROM " + tableName + " FOR UPDATE")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.execute()).andReturn(true);
+        preparedStatement.close();
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("UPDATE " + tableName + " SET MOMENT = 1")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.executeUpdate()).andThrow(new SQLException());
+        preparedStatement.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean lockAquired = lock.lock();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertFalse(lockAquired);
+    }
+    
+    @Test
+    public void release() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.isClosed()).andReturn(false);
+        connection.rollback();
+        connection.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        lock.release();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+    }
+    
+    @Test
+    public void releaseShouldSucceedForAnAlreadyClosedConnection() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(true);
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        lock.release();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+    }
+    
+    @Test
+    public void releaseShouldSucceedForANullConnectionReference() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        lock.lockConnection = null;
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        lock.release();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+    }
+    
+    @Test
+    public void isAliveShouldReturnTrueIfItHoldsTheLock() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("UPDATE " + tableName + " SET MOMENT = 1")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.executeUpdate()).andReturn(1);
+        preparedStatement.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean alive = lock.isAlive();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertTrue(alive);
+    }
+    
+    @Test
+    public void isAliveShouldReturnFalseIfTheConnectionIsClosed() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(true);
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean alive = lock.isAlive();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertFalse(alive);
+    }
+    
+    @Test
+    public void isAliveShouldReturnFalseIfTheConnectionIsNull() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        lock.lockConnection = null;
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean alive = lock.isAlive();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertFalse(alive);
+    }
+    
+    @Test
+    public void isAliveShouldReturnFalseIfItNotHoldsTheLock() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("UPDATE " + tableName + " SET MOMENT = 1")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.executeUpdate()).andThrow(new SQLException());
+        preparedStatement.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean alive = lock.isAlive();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertFalse(alive);
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/test/java/org/apache/felix/karaf/main/DefaultJDBCLockIntegrationTest.java b/karaf/main/src/test/java/org/apache/felix/karaf/main/DefaultJDBCLockIntegrationTest.java
new file mode 100644
index 0000000..c2d586d
--- /dev/null
+++ b/karaf/main/src/test/java/org/apache/felix/karaf/main/DefaultJDBCLockIntegrationTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+@Ignore
+public class DefaultJDBCLockIntegrationTest extends BaseJDBCLockIntegrationTest {
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        password = "root";
+        driver = "org.apache.derby.jdbc.ClientDriver";
+        url = "jdbc:derby://127.0.0.1:1527/test";
+        
+        super.setUp();
+    }
+    
+    @Override
+    DefaultJDBCLock createLock(Properties props) {
+        return new DefaultJDBCLock(props);
+    }
+    
+    @Test
+    public void initShouldCreateTheDatabaseIfItNotExists() throws Exception {
+        String database = "test" + System.currentTimeMillis();
+        url = "jdbc:derby://127.0.0.1:1527/" + database;
+        props.put("karaf.lock.jdbc.url", url);
+        lock = createLock(props);
+        lock.lock();
+        
+        assertTrue(lock.lockConnection.getMetaData().getURL().contains(database));
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/test/java/org/apache/felix/karaf/main/DefaultJDBCLockTest.java b/karaf/main/src/test/java/org/apache/felix/karaf/main/DefaultJDBCLockTest.java
new file mode 100644
index 0000000..101aa20
--- /dev/null
+++ b/karaf/main/src/test/java/org/apache/felix/karaf/main/DefaultJDBCLockTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class DefaultJDBCLockTest extends BaseJDBCLockTest {
+    
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        password = "root";
+        driver = "org.apache.derby.jdbc.ClientDriver";
+        url = "jdbc:derby://127.0.0.1:1527/test";
+        
+        super.setUp();
+    }
+    
+    DefaultJDBCLock createLock(Properties props) {
+        return new DefaultJDBCLock(props) {
+            @Override
+            Connection doCreateConnection(String driver, String url, String username, String password) {
+                assertEquals(this.driver, driver);
+                assertEquals(this.url + ";create=true", url);
+                assertEquals(this.user, username);
+                assertEquals(this.password, password);
+                return connection;
+            }
+
+            @Override
+            long getCurrentTimeMillis() {
+                return 1;
+            }
+        };
+    }
+    
+    @Test
+    public void createConnectionShouldConcatinateOptionsCorrect() {
+        props.put("karaf.lock.jdbc.url", this.url + ";dataEncryption=false");
+        
+        lock = new DefaultJDBCLock(props) {
+            @Override
+            Connection doCreateConnection(String driver, String url, String username, String password) {
+                assertEquals(this.driver, driver);
+                assertEquals(this.url + ";create=true", url);
+                assertEquals(this.user, username);
+                assertEquals(this.password, password);
+                return connection;
+            }
+
+            @Override
+            long getCurrentTimeMillis() {
+                return 1;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/test/java/org/apache/felix/karaf/main/DerbyJDBCLockIntegrationTest.java b/karaf/main/src/test/java/org/apache/felix/karaf/main/DerbyJDBCLockIntegrationTest.java
new file mode 100644
index 0000000..d176edc
--- /dev/null
+++ b/karaf/main/src/test/java/org/apache/felix/karaf/main/DerbyJDBCLockIntegrationTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+@Ignore
+public class DerbyJDBCLockIntegrationTest extends BaseJDBCLockIntegrationTest {
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        password = "root";
+        driver = "org.apache.derby.jdbc.ClientDriver";
+        url = "jdbc:derby://127.0.0.1:1527/test";
+        
+        super.setUp();
+    }
+    
+    @Override
+    DefaultJDBCLock createLock(Properties props) {
+        return new DerbyJDBCLock(props);
+    }
+    
+    @Test
+    public void initShouldCreateTheDatabaseIfItNotExists() throws Exception {
+        String database = "test" + System.currentTimeMillis();
+        url = "jdbc:derby://127.0.0.1:1527/" + database;
+        props.put("karaf.lock.jdbc.url", url);
+        lock = createLock(props);
+        lock.lock();
+        
+        assertTrue(lock.lockConnection.getMetaData().getURL().contains(database));
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/test/java/org/apache/felix/karaf/main/DerbyJDBCLockTest.java b/karaf/main/src/test/java/org/apache/felix/karaf/main/DerbyJDBCLockTest.java
new file mode 100644
index 0000000..548937f
--- /dev/null
+++ b/karaf/main/src/test/java/org/apache/felix/karaf/main/DerbyJDBCLockTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class DerbyJDBCLockTest extends BaseJDBCLockTest {
+    
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        password = "root";
+        driver = "org.apache.derby.jdbc.ClientDriver";
+        url = "jdbc:derby://127.0.0.1:1527/test";
+        
+        super.setUp();
+    }
+    
+    DerbyJDBCLock createLock(Properties props) {
+        return new DerbyJDBCLock(props) {
+            @Override
+            Connection doCreateConnection(String driver, String url, String username, String password) {
+                assertEquals(this.driver, driver);
+                assertEquals(this.url + ";create=true", url);
+                assertEquals(this.user, username);
+                assertEquals(this.password, password);
+                return connection;
+            }
+
+            @Override
+            long getCurrentTimeMillis() {
+                return 1;
+            }
+        };
+    }
+    
+    @Test
+    public void createConnectionShouldConcatinateOptionsCorrect() {
+        props.put("karaf.lock.jdbc.url", this.url + ";dataEncryption=false");
+        
+        lock = new DerbyJDBCLock(props) {
+            @Override
+            Connection doCreateConnection(String driver, String url, String username, String password) {
+                assertEquals(this.driver, driver);
+                assertEquals(this.url + ";create=true", url);
+                assertEquals(this.user, username);
+                assertEquals(this.password, password);
+                return connection;
+            }
+
+            @Override
+            long getCurrentTimeMillis() {
+                return 1;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/test/java/org/apache/felix/karaf/main/MySQLJDBCLockIntegrationTest.java b/karaf/main/src/test/java/org/apache/felix/karaf/main/MySQLJDBCLockIntegrationTest.java
new file mode 100644
index 0000000..4b5f7ca
--- /dev/null
+++ b/karaf/main/src/test/java/org/apache/felix/karaf/main/MySQLJDBCLockIntegrationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+
+import static org.junit.Assert.assertFalse;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+@Ignore
+public class MySQLJDBCLockIntegrationTest extends BaseJDBCLockIntegrationTest {
+
+    @Before
+    public void setUp() throws Exception {
+        driver = "com.mysql.jdbc.Driver";
+        url = "jdbc:mysql://127.0.0.1:3306/test";
+        
+        super.setUp();
+    }
+    
+    @Override
+    MySQLJDBCLock createLock(Properties props) {
+        return new MySQLJDBCLock(props);
+    }
+    
+    @Test
+    public void initShouldCreateTheDatabaseIfItNotExists() throws Exception {
+        String database = "test" + System.currentTimeMillis();
+        
+        try {
+            executeStatement("DROP DATABASE " + database);
+        } catch (Exception e) {
+            // expected if table dosn't exist
+        }
+        
+        url = "jdbc:mysql://127.0.0.1:3306/" + database;
+        props.put("karaf.lock.jdbc.url", url);
+        lock = createLock(props);
+        
+        
+        // should throw an exeption, if the database doesn't exists
+        Connection connection = getConnection("jdbc:mysql://127.0.0.1:3306/" + database, user, password);
+        assertFalse(connection.isClosed());
+        
+        executeStatement("DROP DATABASE " + database);
+        close(connection);
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/test/java/org/apache/felix/karaf/main/MySQLJDBCLockTest.java b/karaf/main/src/test/java/org/apache/felix/karaf/main/MySQLJDBCLockTest.java
new file mode 100644
index 0000000..112701a
--- /dev/null
+++ b/karaf/main/src/test/java/org/apache/felix/karaf/main/MySQLJDBCLockTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class MySQLJDBCLockTest extends BaseJDBCLockTest {
+    
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        driver = "com.mysql.jdbc.Driver";
+        url = "jdbc:mysql://127.0.0.1:3306/test";
+        createTableStmtSuffix = " ENGINE=INNODB";
+        
+        super.setUp();
+    }
+    
+    MySQLJDBCLock createLock(Properties props) {
+        return new MySQLJDBCLock(props) {
+            @Override
+            Connection doCreateConnection(String driver, String url, String username, String password) {
+                assertEquals(this.driver, driver);
+                assertEquals(this.url + "?createDatabaseIfNotExist=true", url);
+                assertEquals(this.user, username);
+                assertEquals(this.password, password);
+                return connection;
+            }
+
+            @Override
+            long getCurrentTimeMillis() {
+                return 1;
+            }
+        };
+    }
+    
+    @Test
+    public void createConnectionShouldConcatinateOptionsCorrect() {
+        props.put("karaf.lock.jdbc.url", this.url + "?connectTimeout=10000");
+        
+        lock = new MySQLJDBCLock(props) {
+            @Override
+            Connection doCreateConnection(String driver, String url, String username, String password) {
+                assertEquals(this.driver, driver);
+                assertEquals(this.url + "&createDatabaseIfNotExist=true", url);
+                assertEquals(this.user, username);
+                assertEquals(this.password, password);
+                return connection;
+            }
+
+            @Override
+            long getCurrentTimeMillis() {
+                return 1;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/test/java/org/apache/felix/karaf/main/OracleJDBCLockIntegrationTest.java b/karaf/main/src/test/java/org/apache/felix/karaf/main/OracleJDBCLockIntegrationTest.java
new file mode 100644
index 0000000..08bacc2
--- /dev/null
+++ b/karaf/main/src/test/java/org/apache/felix/karaf/main/OracleJDBCLockIntegrationTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Ignore;
+
+
+@Ignore
+public class OracleJDBCLockIntegrationTest extends BaseJDBCLockIntegrationTest {
+    
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        password = "root";
+        driver = "oracle.jdbc.driver.OracleDriver";
+        url = "jdbc:oracle:thin:@172.16.16.133:1521:XE";
+        momentDatatype = "NuMBER(20)";
+        
+        super.setUp();
+    }
+
+    OracleJDBCLock createLock(Properties props) {
+        return new OracleJDBCLock(props);
+    }
+    
+    @Override
+    Connection lock(String table, String node) throws ClassNotFoundException, SQLException {
+        Connection connection = null;
+        Statement statement = null;
+        
+        try {
+            connection = getConnection(url, user, password);
+            statement = connection.createStatement();
+            statement.execute("SELECT * FROM " + table + " FOR UPDATE");
+        } finally {
+            close(statement);
+            // connection must not be closed!
+        }
+        
+        return connection;
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/test/java/org/apache/felix/karaf/main/OracleJDBCLockTest.java b/karaf/main/src/test/java/org/apache/felix/karaf/main/OracleJDBCLockTest.java
new file mode 100644
index 0000000..c4c96d8
--- /dev/null
+++ b/karaf/main/src/test/java/org/apache/felix/karaf/main/OracleJDBCLockTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.*;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class OracleJDBCLockTest extends BaseJDBCLockTest {
+    
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        password = "root";
+        driver = "oracle.jdbc.driver.OracleDriver";
+        url = "jdbc:oracle:thin:@172.16.16.132:1521:XE";
+        momentDatatype = "NUMBER(20)";
+        
+        super.setUp();
+    }
+    
+    OracleJDBCLock createLock(Properties props) {
+        return new OracleJDBCLock(props) {
+            @Override
+            Connection doCreateConnection(String driver, String url, String username, String password) {
+                assertEquals(this.driver, driver);
+                assertEquals(this.url, url);
+                assertEquals(this.user, username);
+                assertEquals(this.password, password);
+                return connection;
+            }
+
+            @Override
+            long getCurrentTimeMillis() {
+                return 1;
+            }
+        };
+    }
+    
+    @Test
+    @Override
+    public void lockShouldReturnTrueItTheTableIsNotLocked() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("SELECT * FROM " + tableName + " FOR UPDATE")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.execute()).andReturn(true);
+        preparedStatement.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean lockAquired = lock.lock();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertTrue(lockAquired);
+    }
+    
+    @Test
+    @Override
+    public void lockShouldReturnFalseIfAnotherRowIsLocked() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("SELECT * FROM " + tableName + " FOR UPDATE")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.execute()).andThrow(new SQLException());
+        preparedStatement.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean lockAquired = lock.lock();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertFalse(lockAquired);
+    }
+    
+    @Test
+    @Override
+    public void lockShouldReturnFalseIfTheRowIsAlreadyLocked() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("SELECT * FROM " + tableName + " FOR UPDATE")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.execute()).andThrow(new SQLException());
+        preparedStatement.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean lockAquired = lock.lock();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertFalse(lockAquired);
+    }
+    
+    @Test
+    public void isAliveShouldReturnTrueIfItHoldsTheLock() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("SELECT * FROM " + tableName + " FOR UPDATE")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.execute()).andReturn(true);
+        preparedStatement.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean alive = lock.isAlive();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertTrue(alive);
+    }
+    
+    @Test
+    public void isAliveShouldReturnFalseIfItNotHoldsTheLock() throws Exception {
+        initShouldNotCreateTheSchemaIfItAlreadyExists();
+        reset(connection, metaData, statement, preparedStatement, resultSet);
+        
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.isClosed()).andReturn(false);
+        expect(connection.prepareStatement("SELECT * FROM " + tableName + " FOR UPDATE")).andReturn(preparedStatement);
+        preparedStatement.setQueryTimeout(10);
+        expect(preparedStatement.execute()).andThrow(new SQLException());
+        preparedStatement.close();
+        
+        replay(connection, metaData, statement, preparedStatement, resultSet);
+        
+        boolean alive = lock.isAlive();
+        
+        verify(connection, metaData, statement, preparedStatement, resultSet);
+        assertFalse(alive);
+    }
+}
\ No newline at end of file
diff --git a/karaf/main/src/test/java/org/apache/felix/karaf/main/StatementsTest.java b/karaf/main/src/test/java/org/apache/felix/karaf/main/StatementsTest.java
new file mode 100644
index 0000000..87e31a8
--- /dev/null
+++ b/karaf/main/src/test/java/org/apache/felix/karaf/main/StatementsTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.karaf.main;
+
+import static org.junit.Assert.*;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class StatementsTest {
+    
+    private static final String DEFAULT_CREATE_TABLE_STMT = "CREATE TABLE KARAF_LOCK (MOMENT BIGINT, NODE VARCHAR(20))";
+    private static final String DEFAULT_POPULATE_TABLE_STMT = "INSERT INTO KARAF_LOCK (MOMENT, NODE) VALUES (1, 'karaf')";
+    
+    private Statements statements;
+    
+    @Before
+    public void setUp() {
+        statements = new Statements();
+    }
+
+    @Test
+    public void getDefaultLockCreateSchemaStatements() {
+        assertArrayEquals(new String[] {DEFAULT_CREATE_TABLE_STMT, DEFAULT_POPULATE_TABLE_STMT}, statements.getLockCreateSchemaStatements(1));
+    }
+    
+    @Test
+    public void getCustomLockCreateSchemaStatements() {
+        customizeStatements();
+        String[] expectedCreateSchemaStmts = new String[] {
+                "CREATE TABLE test.LOCK_TABLE (MOMENT NUMBER(20), NODE VARCHAR2(30))", 
+                "INSERT INTO test.LOCK_TABLE (MOMENT, NODE) VALUES (2, 'node_1')"};
+        
+        assertArrayEquals(expectedCreateSchemaStmts, statements.getLockCreateSchemaStatements(2));
+    }
+
+    @Test
+    public void getDefaultLockCreateStatement() {
+        assertEquals("SELECT * FROM KARAF_LOCK FOR UPDATE", statements.getLockCreateStatement());
+    }
+    
+    @Test
+    public void getCustomLockCreateStatement() {
+        customizeStatements();
+        
+        assertEquals("SELECT * FROM test.LOCK_TABLE FOR UPDATE", statements.getLockCreateStatement());
+    }
+
+    @Test
+    public void getDefaultLockUpdateStatement() {
+        assertEquals("UPDATE KARAF_LOCK SET MOMENT = 1", statements.getLockUpdateStatement(1));
+    }
+    
+    @Test
+    public void getCustomLockUpdateStatement() {
+        customizeStatements();
+        
+        assertEquals("UPDATE test.LOCK_TABLE SET MOMENT = 2", statements.getLockUpdateStatement(2));
+    }
+    
+    private void customizeStatements() {
+        statements.setTablePrefix("test.");
+        statements.setTableName("LOCK_TABLE");
+        statements.setNodeName("node_1");
+        statements.setMomentColumnDataType("NUMBER(20)");
+        statements.setNodeColumnDataType("VARCHAR2(30)");
+    }
+}
\ No newline at end of file