Add client-side timer for blocking on DistributedLock#tryLock(Duration) while partition is unavailable
Change-Id: Iccae74ff2139d83525a7ff185d916826014914aa
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
index c5a4250..c234be3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
@@ -16,22 +16,29 @@
package org.onosproject.store.primitives.resources.impl;
import java.time.Duration;
-import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import com.google.common.collect.Maps;
import io.atomix.protocols.raft.proxy.RaftProxy;
-import io.atomix.utils.concurrent.Futures;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.OrderedExecutor;
+import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncDistributedLock;
import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.Version;
+import static org.onlab.util.Tools.orderedFuture;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAILED;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.LOCKED;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
@@ -39,6 +46,9 @@
/**
* Atomix lock implementation.
+ * <p>
+ * This {@link org.onosproject.store.service.DistributedLock} implementation uses a {@link RaftProxy} to interact
+ * with a {@link AtomixDistributedLockService} replicated state machine.
*/
public class AtomixDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
@@ -47,108 +57,125 @@
.register(AtomixDistributedLockEvents.NAMESPACE)
.build());
- private final Map<Integer, CompletableFuture<Version>> futures = new ConcurrentHashMap<>();
+ private final ScheduledExecutorService scheduledExecutor;
+ private final Executor orderedExecutor;
+ private final Map<Integer, LockAttempt> attempts = Maps.newConcurrentMap();
private final AtomicInteger id = new AtomicInteger();
- private int lock;
+ private final AtomicInteger lock = new AtomicInteger();
public AtomixDistributedLock(RaftProxy proxy) {
super(proxy);
- proxy.addStateChangeListener(this::handleStateChange);
- proxy.addEventListener(AtomixDistributedLockEvents.LOCK, SERIALIZER::decode, this::handleLocked);
- proxy.addEventListener(AtomixDistributedLockEvents.FAIL, SERIALIZER::decode, this::handleFailed);
+ this.scheduledExecutor = SharedScheduledExecutors.getPoolThreadExecutor();
+ this.orderedExecutor = new OrderedExecutor(scheduledExecutor);
+ proxy.addEventListener(LOCKED, SERIALIZER::decode, this::handleLocked);
+ proxy.addEventListener(FAILED, SERIALIZER::decode, this::handleFailed);
}
+ /**
+ * Handles a {@code LOCKED} event.
+ *
+ * @param event the event to handle
+ */
private void handleLocked(LockEvent event) {
- CompletableFuture<Version> future = futures.remove(event.id());
- if (future != null) {
- this.lock = event.id();
- future.complete(new Version(event.version()));
+ // Remove the LockAttempt from the attempts map and complete it with the lock version if it exists.
+ // If the attempt no longer exists, it likely was expired by a client-side timer.
+ LockAttempt attempt = attempts.remove(event.id());
+ if (attempt != null) {
+ attempt.complete(new Version(event.version()));
}
}
+ /**
+ * Handles a {@code FAILED} event.
+ *
+ * @param event the event to handle
+ */
private void handleFailed(LockEvent event) {
- CompletableFuture<Version> future = futures.remove(event.id());
- if (future != null) {
- future.complete(null);
- }
- }
-
- private void handleStateChange(RaftProxy.State state) {
- if (state != RaftProxy.State.CONNECTED) {
- Iterator<Map.Entry<Integer, CompletableFuture<Version>>> iterator = futures.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Integer, CompletableFuture<Version>> entry = iterator.next();
- entry.getValue().completeExceptionally(new StorageException.Unavailable());
- proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(entry.getKey()));
- iterator.remove();
- }
- lock = 0;
+ // Remove the LockAttempt from the attempts map and complete it with a null value if it exists.
+ // If the attempt no longer exists, it likely was expired by a client-side timer.
+ LockAttempt attempt = attempts.remove(event.id());
+ if (attempt != null) {
+ attempt.complete(null);
}
}
@Override
public CompletableFuture<Version> lock() {
- RaftProxy.State state = proxy.getState();
- if (state != RaftProxy.State.CONNECTED) {
- return Futures.exceptionalFuture(new StorageException.Unavailable());
- }
-
- CompletableFuture<Version> future = new CompletableFuture<>();
- int id = this.id.incrementAndGet();
- futures.put(id, future);
- proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, -1)).whenComplete((result, error) -> {
+ // Create and register a new attempt and invoke the LOCK operation on the replicated state machine.
+ LockAttempt attempt = new LockAttempt();
+ proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), -1)).whenComplete((result, error) -> {
if (error != null) {
- futures.remove(id);
- future.completeExceptionally(error);
+ attempt.completeExceptionally(error);
}
});
- return future;
+
+ // Return an ordered future that can safely be blocked inside the executor thread.
+ return orderedFuture(attempt, orderedExecutor, scheduledExecutor);
}
@Override
public CompletableFuture<Optional<Version>> tryLock() {
+ // If the proxy is currently disconnected from the cluster, we can just fail the lock attempt here.
RaftProxy.State state = proxy.getState();
if (state != RaftProxy.State.CONNECTED) {
- return Futures.exceptionalFuture(new StorageException.Unavailable());
+ return CompletableFuture.completedFuture(Optional.empty());
}
- CompletableFuture<Version> future = new CompletableFuture<>();
- int id = this.id.incrementAndGet();
- futures.put(id, future);
- proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, 0)).whenComplete((result, error) -> {
+ // Create and register a new attempt and invoke the LOCK operation on teh replicated state machine with
+ // a 0 timeout. The timeout will cause the state machine to immediately reject the request if the lock is
+ // already owned by another process.
+ LockAttempt attempt = new LockAttempt();
+ proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), 0)).whenComplete((result, error) -> {
if (error != null) {
- futures.remove(id);
- future.completeExceptionally(error);
+ attempt.completeExceptionally(error);
}
});
- return future.thenApply(Optional::ofNullable);
+
+ // Return an ordered future that can safely be blocked inside the executor thread.
+ return orderedFuture(attempt, orderedExecutor, scheduledExecutor)
+ .thenApply(Optional::ofNullable);
}
@Override
public CompletableFuture<Optional<Version>> tryLock(Duration timeout) {
- RaftProxy.State state = proxy.getState();
- if (state != RaftProxy.State.CONNECTED) {
- return Futures.exceptionalFuture(new StorageException.Unavailable());
- }
-
- CompletableFuture<Version> future = new CompletableFuture<>();
- int id = this.id.incrementAndGet();
- futures.put(id, future);
- proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, timeout.toMillis())).whenComplete((result, error) -> {
- if (error != null) {
- futures.remove(id);
- future.completeExceptionally(error);
- }
+ // Create a lock attempt with a client-side timeout and fail the lock if the timer expires.
+ // Because time does not progress at the same rate on different nodes, we can't guarantee that
+ // the lock won't be granted to this process after it's expired here. Thus, if this timer expires and
+ // we fail the lock on the client, we also still need to send an UNLOCK command to the cluster in case it's
+ // later granted by the cluster. Note that the semantics of the Raft client will guarantee this operation
+ // occurs after any prior LOCK attempt, and the Raft client will retry the UNLOCK request until successful.
+ // Additionally, sending the unique lock ID with the command ensures we won't accidentally unlock a different
+ // lock call also granted to this process.
+ LockAttempt attempt = new LockAttempt(timeout, a -> {
+ a.complete(null);
+ proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(a.id()));
});
- return future.thenApply(Optional::ofNullable);
+
+ // Invoke the LOCK operation on the replicated state machine with the given timeout. If the lock is currently
+ // held by another process, the state machine will add the attempt to a queue and publish a FAILED event if
+ // the timer expires before this process can be granted the lock. If the client cannot reach the Raft cluster,
+ // the client-side timer will expire the attempt.
+ proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), timeout.toMillis()))
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ attempt.completeExceptionally(error);
+ }
+ });
+
+ // Return an ordered future that can safely be blocked inside the executor thread.
+ return orderedFuture(attempt, orderedExecutor, scheduledExecutor)
+ .thenApply(Optional::ofNullable);
}
@Override
public CompletableFuture<Void> unlock() {
- int lock = this.lock;
- this.lock = 0;
+ // Use the current lock ID to ensure we only unlock the lock currently held by this process.
+ int lock = this.lock.getAndSet(0);
if (lock != 0) {
- return proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock));
+ return orderedFuture(
+ proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock)),
+ orderedExecutor,
+ scheduledExecutor);
}
return CompletableFuture.completedFuture(null);
}
@@ -161,4 +188,60 @@
public CompletableFuture<Void> close() {
return proxy.close();
}
+
+ /**
+ * Lock attempt.
+ */
+ private class LockAttempt extends CompletableFuture<Version> {
+ private final int id;
+ private final ScheduledFuture<?> scheduledFuture;
+
+ LockAttempt() {
+ this(null, null);
+ }
+
+ LockAttempt(Duration duration, Consumer<LockAttempt> callback) {
+ this.id = AtomixDistributedLock.this.id.incrementAndGet();
+ this.scheduledFuture = duration != null && callback != null
+ ? scheduledExecutor.schedule(() -> callback.accept(this), duration.toMillis(), TimeUnit.MILLISECONDS)
+ : null;
+ attempts.put(id, this);
+ }
+
+ /**
+ * Returns the lock attempt ID.
+ *
+ * @return the lock attempt ID
+ */
+ int id() {
+ return id;
+ }
+
+ @Override
+ public boolean complete(Version version) {
+ if (isDone()) {
+ return super.complete(null);
+ }
+ cancel();
+ if (version != null) {
+ lock.set(id);
+ return super.complete(version);
+ } else {
+ return super.complete(null);
+ }
+ }
+
+ @Override
+ public boolean completeExceptionally(Throwable ex) {
+ cancel();
+ return super.completeExceptionally(ex);
+ }
+
+ private void cancel() {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ }
+ attempts.remove(id);
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java
index 0ef9270..e6d2b2a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java
@@ -23,8 +23,8 @@
* Raft value events.
*/
public enum AtomixDistributedLockEvents implements EventType {
- LOCK("lock"),
- FAIL("fail");
+ LOCKED("lock"),
+ FAILED("fail");
private final String id;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
index a2dab95..e730ffd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
@@ -33,7 +33,8 @@
import org.onosproject.store.service.Serializer;
import static com.google.common.base.MoreObjects.toStringHelper;
-import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAIL;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAILED;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.LOCKED;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
@@ -71,6 +72,9 @@
public void install(SnapshotReader reader) {
lock = reader.readObject(SERIALIZER::decode);
queue = reader.readObject(SERIALIZER::decode);
+
+ // After the snapshot is installed, we need to cancel any existing timers and schedule new ones based on the
+ // state provided by the snapshot.
timers.values().forEach(Scheduled::cancel);
timers.clear();
for (LockHolder holder : queue) {
@@ -82,7 +86,7 @@
queue.remove(holder);
RaftSession session = sessions().getSession(holder.session);
if (session != null && session.getState().active()) {
- session.publish(FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
+ session.publish(FAILED, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
}
}));
}
@@ -105,6 +109,9 @@
* @param commit the lock commit
*/
protected void lock(Commit<Lock> commit) {
+ // If the lock is not already owned, immediately grant the lock to the requester.
+ // Note that we still have to publish an event to the session. The event is guaranteed to be received
+ // by the client-side primitive after the LOCK response.
if (lock == null) {
lock = new LockHolder(
commit.value().id(),
@@ -112,11 +119,14 @@
commit.session().sessionId().id(),
0);
commit.session().publish(
- AtomixDistributedLockEvents.LOCK,
+ LOCKED,
SERIALIZER::encode,
new LockEvent(commit.value().id(), commit.index()));
+ // If the timeout is 0, that indicates this is a tryLock request. Immediately fail the request.
} else if (commit.value().timeout() == 0) {
- commit.session().publish(FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
+ commit.session().publish(FAILED, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
+ // If a timeout exists, add the request to the queue and set a timer. Note that the lock request expiration
+ // time is based on the *state machine* time - not the system time - to ensure consistency across servers.
} else if (commit.value().timeout() > 0) {
LockHolder holder = new LockHolder(
commit.value().id(),
@@ -125,15 +135,19 @@
wallClock().getTime().unixTimestamp() + commit.value().timeout());
queue.add(holder);
timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
+ // When the lock request timer expires, remove the request from the queue and publish a FAILED
+ // event to the session. Note that this timer is guaranteed to be executed in the same thread as the
+ // state machine commands, so there's no need to use a lock here.
timers.remove(commit.index());
queue.remove(holder);
if (commit.session().getState().active()) {
commit.session().publish(
- FAIL,
+ FAILED,
SERIALIZER::encode,
new LockEvent(commit.value().id(), commit.index()));
}
}));
+ // If the lock is -1, just add the request to the queue with no expiration.
} else {
LockHolder holder = new LockHolder(
commit.value().id(),
@@ -151,24 +165,35 @@
*/
protected void unlock(Commit<Unlock> commit) {
if (lock != null) {
+ // If the commit's session does not match the current lock holder, ignore the request.
if (lock.session != commit.session().sessionId().id()) {
return;
}
+ // If the current lock ID does not match the requested lock ID, ignore the request. This ensures that
+ // internal releases of locks that were never acquired by the client-side primitive do not cause
+ // legitimate locks to be unlocked.
+ if (lock.id != commit.value().id()) {
+ return;
+ }
+
+ // The lock has been released. Populate the lock from the queue.
lock = queue.poll();
while (lock != null) {
+ // If the waiter has a lock timer, cancel the timer.
Scheduled timer = timers.remove(lock.index);
if (timer != null) {
timer.cancel();
}
+ // If the lock session is for some reason inactive, continue on to the next waiter. Otherwise,
+ // publish a LOCKED event to the new lock holder's session.
RaftSession session = sessions().getSession(lock.session);
- if (session == null || session.getState() == RaftSession.State.EXPIRED
- || session.getState() == RaftSession.State.CLOSED) {
+ if (session == null || !session.getState().active()) {
lock = queue.poll();
} else {
session.publish(
- AtomixDistributedLockEvents.LOCK,
+ LOCKED,
SERIALIZER::encode,
new LockEvent(lock.id, commit.index()));
break;
@@ -177,29 +202,41 @@
}
}
+ /**
+ * Handles a session that has been closed by a client or expired by the cluster.
+ * <p>
+ * When a session is removed, if the session is the current lock holder then the lock is released and the next
+ * session waiting in the queue is granted the lock. Additionally, all pending lock requests for the session
+ * are removed from the lock queue.
+ *
+ * @param session the closed session
+ */
private void releaseSession(RaftSession session) {
+ // Remove all instances of the session from the lock queue.
+ queue.removeIf(lock -> lock.session == session.sessionId().id());
+
+ // If the removed session is the current holder of the lock, nullify the lock and attempt to grant it
+ // to the next waiter in the queue.
if (lock != null && lock.session == session.sessionId().id()) {
lock = queue.poll();
while (lock != null) {
- if (lock.session == session.sessionId().id()) {
+ // If the waiter has a lock timer, cancel the timer.
+ Scheduled timer = timers.remove(lock.index);
+ if (timer != null) {
+ timer.cancel();
+ }
+
+ // If the lock session is inactive, continue on to the next waiter. Otherwise,
+ // publish a LOCKED event to the new lock holder's session.
+ RaftSession lockSession = sessions().getSession(lock.session);
+ if (lockSession == null || !lockSession.getState().active()) {
lock = queue.poll();
} else {
- Scheduled timer = timers.remove(lock.index);
- if (timer != null) {
- timer.cancel();
- }
-
- RaftSession lockSession = sessions().getSession(lock.session);
- if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED
- || lockSession.getState() == RaftSession.State.CLOSED) {
- lock = queue.poll();
- } else {
- lockSession.publish(
- AtomixDistributedLockEvents.LOCK,
- SERIALIZER::encode,
- new LockEvent(lock.id, lock.index));
- break;
- }
+ lockSession.publish(
+ LOCKED,
+ SERIALIZER::encode,
+ new LockEvent(lock.id, lock.index));
+ break;
}
}
}