1. Adds a lockAsync method to LockService for async lock acquisition.
2. Fixes a bug where lock() wasn't attempting a tryLock before registering for lock avalilability.
(Note 1 above is needed for LeadershipService which will come later)
Change-Id: I1deaa445f7cdf86416b335df1d7358e17eff19c3
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
index ed9a9e6..2f5f112 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
@@ -1,5 +1,7 @@
package org.onlab.onos.store.service;
+import java.util.concurrent.Future;
+
/**
* A lock is a tool for controlling access to a shared resource by multiple processes.
* Commonly, a lock provides exclusive access to a resource such as a network device
@@ -36,6 +38,14 @@
void lock(int leaseDurationMillis) throws InterruptedException;
/**
+ * Acquires the lock asynchronously.
+ * @param leaseDurationMillis leaseDurationMillis the number of milliseconds the lock
+ * will be reserved before it becomes available for others.
+ * @return Future that can be used for blocking until lock is acquired.
+ */
+ Future<Void> lockAsync(int leaseDurationMillis);
+
+ /**
* Acquires the lock only if it is free at the time of invocation.
* @param leaseDurationMillis the number of milliseconds the must be
* locked after it is granted, before automatically releasing it if it hasn't
@@ -57,7 +67,7 @@
* elapsed before the lock was acquired
* @throws InterruptedException if the thread is interrupted while waiting
*/
- boolean tryLock(long waitTimeMillis, int leaseDurationMillis) throws InterruptedException;
+ boolean tryLock(int waitTimeMillis, int leaseDurationMillis) throws InterruptedException;
/**
* Returns true if this Lock instance currently holds the lock.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index f84baaf..7575ed9 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -6,6 +6,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,20 +54,22 @@
@Override
public void lock(int leaseDurationMillis) throws InterruptedException {
- if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
- return;
- } else {
- CompletableFuture<DateTime> future =
- lockManager.lockIfAvailable(this, leaseDurationMillis);
- try {
- lockExpirationTime = future.get();
- } catch (ExecutionException e) {
- throw new DatabaseException(e);
- }
+ try {
+ lockAsync(leaseDurationMillis).get();
+ } catch (ExecutionException e) {
+ throw new DatabaseException(e);
}
}
@Override
+ public Future<Void> lockAsync(int leaseDurationMillis) {
+ if (isLocked() || tryLock(leaseDurationMillis)) {
+ return CompletableFuture.<Void>completedFuture(null);
+ }
+ return lockManager.lockIfAvailable(this, leaseDurationMillis);
+ }
+
+ @Override
public boolean tryLock(int leaseDurationMillis) {
if (databaseService.putIfAbsent(
DistributedLockManager.ONOS_LOCK_TABLE_NAME,
@@ -81,15 +84,16 @@
@Override
public boolean tryLock(
- long waitTimeMillis,
+ int waitTimeMillis,
int leaseDurationMillis) throws InterruptedException {
- if (tryLock(leaseDurationMillis)) {
+ if (isLocked() || tryLock(leaseDurationMillis)) {
return true;
}
- CompletableFuture<DateTime> future =
+
+ CompletableFuture<Void> future =
lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
try {
- lockExpirationTime = future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
+ future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
return true;
} catch (ExecutionException e) {
throw new DatabaseException(e);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
index 3ec7876..74f8373 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -111,17 +111,17 @@
* @param lock lock to acquire.
* @param waitTimeMillis maximum time to wait before giving up.
* @param leaseDurationMillis the duration for which to acquire the lock initially.
- * @return Future lease expiration date.
+ * @return Future that can be blocked on until lock becomes available.
*/
- protected CompletableFuture<DateTime> lockIfAvailable(
+ protected CompletableFuture<Void> lockIfAvailable(
Lock lock,
- long waitTimeMillis,
+ int waitTimeMillis,
int leaseDurationMillis) {
- CompletableFuture<DateTime> future = new CompletableFuture<>();
+ CompletableFuture<Void> future = new CompletableFuture<>();
LockRequest request = new LockRequest(
lock,
leaseDurationMillis,
- DateTime.now().plusMillis(leaseDurationMillis),
+ DateTime.now().plusMillis(waitTimeMillis),
future);
locksToAcquire.put(lock.path(), request);
return future;
@@ -133,10 +133,10 @@
* @param leaseDurationMillis the duration for which to acquire the lock initially.
* @return Future lease expiration date.
*/
- protected CompletableFuture<DateTime> lockIfAvailable(
+ protected CompletableFuture<Void> lockIfAvailable(
Lock lock,
int leaseDurationMillis) {
- CompletableFuture<DateTime> future = new CompletableFuture<>();
+ CompletableFuture<Void> future = new CompletableFuture<>();
LockRequest request = new LockRequest(
lock,
leaseDurationMillis,
@@ -189,7 +189,7 @@
existingRequestIterator.remove();
} else {
if (request.lock().tryLock(request.leaseDurationMillis())) {
- request.future().complete(DateTime.now().plusMillis(request.leaseDurationMillis()));
+ request.future().complete(null);
existingRequestIterator.remove();
}
}
@@ -203,13 +203,13 @@
private final Lock lock;
private final DateTime requestExpirationTime;
private final int leaseDurationMillis;
- private final CompletableFuture<DateTime> future;
+ private final CompletableFuture<Void> future;
public LockRequest(
Lock lock,
int leaseDurationMillis,
DateTime requestExpirationTime,
- CompletableFuture<DateTime> future) {
+ CompletableFuture<Void> future) {
this.lock = lock;
this.requestExpirationTime = requestExpirationTime;
@@ -229,7 +229,7 @@
return leaseDurationMillis;
}
- public CompletableFuture<DateTime> future() {
+ public CompletableFuture<Void> future() {
return future;
}
}