1. DatabaseManager activate will attempt to listTables to ensure store is in good shape.
2. lock and tryLock can now throw InterruptedExceptions.
Change-Id: Ifa766ad441f677a4071b68d8f6caa564cf320869
Change-Id: I318ff762a96b261737831f6bd7c200b384c638e9
Change-Id: I0f509703520b3187931fa3669cd8213a91e85c96
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
index f40ae06..7bb24df 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
@@ -160,7 +160,7 @@
}
}
- private void lockUnlock() {
+ private void lockUnlock() throws InterruptedException {
try {
final String locksTable = "onos-locks";
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
index 80f8e9e..85f9114 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
@@ -105,11 +105,11 @@
new ThreadFactoryBuilder()
.setNameFormat("sdnip-leader-election-%d").build());
leaderElectionExecutor.execute(new Runnable() {
- @Override
- public void run() {
- doLeaderElectionThread();
- }
- });
+ @Override
+ public void run() {
+ doLeaderElectionThread();
+ }
+ });
// Manually set the instance as the leader to allow testing
// TODO change this when we get a leader election
@@ -174,22 +174,22 @@
log.debug("SDN-IP Leader Election begin");
// Block until it becomes the leader
- leaderLock.lock(LEASE_DURATION_MS);
+ try {
+ leaderLock.lock(LEASE_DURATION_MS);
- // This instance is the leader
- log.info("SDN-IP Leader Elected");
- intentSynchronizer.leaderChanged(true);
+ // This instance is the leader
+ log.info("SDN-IP Leader Elected");
+ intentSynchronizer.leaderChanged(true);
- // Keep extending the expiration until shutdown
- int extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX - 1;
+ // Keep extending the expiration until shutdown
+ int extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX - 1;
- //
- // Keep periodically extending the lock expiration.
- // If there are multiple back-to-back failures to extend (with
- // extra sleep time between retrials), then release the lock.
- //
- while (!isShutdown) {
- try {
+ //
+ // Keep periodically extending the lock expiration.
+ // If there are multiple back-to-back failures to extend (with
+ // extra sleep time between retrials), then release the lock.
+ //
+ while (!isShutdown) {
Thread.sleep(LEASE_DURATION_MS / LEASE_EXTEND_RETRY_MAX);
if (leaderLock.extendExpiration(LEASE_DURATION_MS)) {
log.trace("SDN-IP Leader Extended");
@@ -211,13 +211,12 @@
break; // Try again to get the lock
}
}
- } catch (InterruptedException e) {
- // Thread interrupted. Time to shutdown
- log.debug("SDN-IP Leader Interrupted");
}
+ } catch (InterruptedException e) {
+ // Thread interrupted. Time to shutdown
+ log.debug("SDN-IP Leader Interrupted");
}
}
-
// If we reach here, the instance was shutdown
intentSynchronizer.leaderChanged(false);
leaderLock.unlock();
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
index dfd4742..ed9a9e6 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
@@ -31,8 +31,9 @@
* lock after granting it, before automatically releasing it if it hasn't
* already been released by invoking unlock(). Must be in the range
* (0, LockManager.MAX_LEASE_MILLIS]
+ * @throws InterruptedException if the thread is interrupted while waiting
*/
- void lock(int leaseDurationMillis);
+ void lock(int leaseDurationMillis) throws InterruptedException;
/**
* Acquires the lock only if it is free at the time of invocation.
@@ -54,8 +55,9 @@
* (0, LockManager.MAX_LEASE_MILLIS]
* @return true if the lock was acquired and false if the waiting time
* elapsed before the lock was acquired
+ * @throws InterruptedException if the thread is interrupted while waiting
*/
- boolean tryLock(long waitTimeMillis, int leaseDurationMillis);
+ boolean tryLock(long waitTimeMillis, int leaseDurationMillis) throws InterruptedException;
/**
* Returns true if this Lock instance currently holds the lock.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 37e75cf..af3f6ac 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -179,6 +179,11 @@
}
client.waitForLeader();
+
+ // Try and list the tables to verify database manager is
+ // in a state where it can serve requests.
+ tryTableListing();
+
log.info("Started.");
}
@@ -214,6 +219,24 @@
}
}
+ private void tryTableListing() {
+ int retries = 0;
+ do {
+ try {
+ listTables();
+ return;
+ } catch (DatabaseException e) {
+ if (retries == 10) {
+ log.error("Failed to listTables after multiple attempts. Giving up.", e);
+ throw e;
+ } else {
+ log.debug("Failed to listTables. Will retry...", e);
+ retries++;
+ }
+ }
+ } while (true);
+ }
+
@Override
public boolean createTable(String name) {
return client.createTable(name);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index 4998179..f84baaf 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -12,6 +12,7 @@
import org.joda.time.DateTime;
import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.Lock;
import org.slf4j.Logger;
@@ -23,8 +24,6 @@
private final Logger log = getLogger(getClass());
- private static final long MAX_WAIT_TIME_MS = 100000000L;
-
private final DistributedLockManager lockManager;
private final DatabaseService databaseService;
private final String path;
@@ -53,13 +52,17 @@
}
@Override
- public void lock(int leaseDurationMillis) {
+ public void lock(int leaseDurationMillis) throws InterruptedException {
if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
- // Nothing to do.
- // Current expiration time is beyond what is requested.
return;
} else {
- tryLock(MAX_WAIT_TIME_MS, leaseDurationMillis);
+ CompletableFuture<DateTime> future =
+ lockManager.lockIfAvailable(this, leaseDurationMillis);
+ try {
+ lockExpirationTime = future.get();
+ } catch (ExecutionException e) {
+ throw new DatabaseException(e);
+ }
}
}
@@ -79,22 +82,17 @@
@Override
public boolean tryLock(
long waitTimeMillis,
- int leaseDurationMillis) {
+ int leaseDurationMillis) throws InterruptedException {
if (tryLock(leaseDurationMillis)) {
return true;
}
-
CompletableFuture<DateTime> future =
lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
try {
lockExpirationTime = future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
return true;
- } catch (ExecutionException | InterruptedException e) {
- log.error("Encountered an exception trying to acquire lock for " + path, e);
- // TODO: ExecutionException could indicate something
- // wrong with the backing database.
- // Throw an exception?
- return false;
+ } catch (ExecutionException e) {
+ throw new DatabaseException(e);
} catch (TimeoutException e) {
log.debug("Timed out waiting to acquire lock for {}", path);
return false;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
index ea0f7d5..3ec7876 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -64,23 +64,22 @@
@Activate
public void activate() {
try {
- Set<String> tableNames = databaseAdminService.listTables();
- if (!tableNames.contains(ONOS_LOCK_TABLE_NAME)) {
+ Set<String> tables = databaseAdminService.listTables();
+
+ if (!tables.contains(ONOS_LOCK_TABLE_NAME)) {
if (databaseAdminService.createTable(ONOS_LOCK_TABLE_NAME, DEAD_LOCK_TIMEOUT_MS)) {
log.info("Created {} table.", ONOS_LOCK_TABLE_NAME);
}
}
} catch (DatabaseException e) {
log.error("DistributedLockManager#activate failed.", e);
- throw e;
}
- clusterCommunicator.addSubscriber(
- DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
- new LockEventMessageListener());
+ clusterCommunicator.addSubscriber(
+ DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
+ new LockEventMessageListener());
- log.info("Started.");
-
+ log.info("Started");
}
@Deactivate
@@ -119,7 +118,31 @@
long waitTimeMillis,
int leaseDurationMillis) {
CompletableFuture<DateTime> future = new CompletableFuture<>();
- locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
+ LockRequest request = new LockRequest(
+ lock,
+ leaseDurationMillis,
+ DateTime.now().plusMillis(leaseDurationMillis),
+ future);
+ locksToAcquire.put(lock.path(), request);
+ return future;
+ }
+
+ /**
+ * Attempts to acquire the lock as soon as it becomes available.
+ * @param lock lock to acquire.
+ * @param leaseDurationMillis the duration for which to acquire the lock initially.
+ * @return Future lease expiration date.
+ */
+ protected CompletableFuture<DateTime> lockIfAvailable(
+ Lock lock,
+ int leaseDurationMillis) {
+ CompletableFuture<DateTime> future = new CompletableFuture<>();
+ LockRequest request = new LockRequest(
+ lock,
+ leaseDurationMillis,
+ DateTime.now().plusYears(100),
+ future);
+ locksToAcquire.put(lock.path(), request);
return future;
}
@@ -182,11 +205,14 @@
private final int leaseDurationMillis;
private final CompletableFuture<DateTime> future;
- public LockRequest(Lock lock, long waitTimeMillis,
- int leaseDurationMillis, CompletableFuture<DateTime> future) {
+ public LockRequest(
+ Lock lock,
+ int leaseDurationMillis,
+ DateTime requestExpirationTime,
+ CompletableFuture<DateTime> future) {
this.lock = lock;
- this.requestExpirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
+ this.requestExpirationTime = requestExpirationTime;
this.leaseDurationMillis = leaseDurationMillis;
this.future = future;
}