Add client-side timer for blocking on DistributedLock#tryLock(Duration) while partition is unavailable
Change-Id: Iccae74ff2139d83525a7ff185d916826014914aa
(cherry picked from commit 1242168c763bb3c8d915ca4f377abc4729879e93)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
index a2dab95..e730ffd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
@@ -33,7 +33,8 @@
import org.onosproject.store.service.Serializer;
import static com.google.common.base.MoreObjects.toStringHelper;
-import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAIL;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAILED;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.LOCKED;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
@@ -71,6 +72,9 @@
public void install(SnapshotReader reader) {
lock = reader.readObject(SERIALIZER::decode);
queue = reader.readObject(SERIALIZER::decode);
+
+ // After the snapshot is installed, we need to cancel any existing timers and schedule new ones based on the
+ // state provided by the snapshot.
timers.values().forEach(Scheduled::cancel);
timers.clear();
for (LockHolder holder : queue) {
@@ -82,7 +86,7 @@
queue.remove(holder);
RaftSession session = sessions().getSession(holder.session);
if (session != null && session.getState().active()) {
- session.publish(FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
+ session.publish(FAILED, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
}
}));
}
@@ -105,6 +109,9 @@
* @param commit the lock commit
*/
protected void lock(Commit<Lock> commit) {
+ // If the lock is not already owned, immediately grant the lock to the requester.
+ // Note that we still have to publish an event to the session. The event is guaranteed to be received
+ // by the client-side primitive after the LOCK response.
if (lock == null) {
lock = new LockHolder(
commit.value().id(),
@@ -112,11 +119,14 @@
commit.session().sessionId().id(),
0);
commit.session().publish(
- AtomixDistributedLockEvents.LOCK,
+ LOCKED,
SERIALIZER::encode,
new LockEvent(commit.value().id(), commit.index()));
+ // If the timeout is 0, that indicates this is a tryLock request. Immediately fail the request.
} else if (commit.value().timeout() == 0) {
- commit.session().publish(FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
+ commit.session().publish(FAILED, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
+ // If a timeout exists, add the request to the queue and set a timer. Note that the lock request expiration
+ // time is based on the *state machine* time - not the system time - to ensure consistency across servers.
} else if (commit.value().timeout() > 0) {
LockHolder holder = new LockHolder(
commit.value().id(),
@@ -125,15 +135,19 @@
wallClock().getTime().unixTimestamp() + commit.value().timeout());
queue.add(holder);
timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
+ // When the lock request timer expires, remove the request from the queue and publish a FAILED
+ // event to the session. Note that this timer is guaranteed to be executed in the same thread as the
+ // state machine commands, so there's no need to use a lock here.
timers.remove(commit.index());
queue.remove(holder);
if (commit.session().getState().active()) {
commit.session().publish(
- FAIL,
+ FAILED,
SERIALIZER::encode,
new LockEvent(commit.value().id(), commit.index()));
}
}));
+ // If the lock is -1, just add the request to the queue with no expiration.
} else {
LockHolder holder = new LockHolder(
commit.value().id(),
@@ -151,24 +165,35 @@
*/
protected void unlock(Commit<Unlock> commit) {
if (lock != null) {
+ // If the commit's session does not match the current lock holder, ignore the request.
if (lock.session != commit.session().sessionId().id()) {
return;
}
+ // If the current lock ID does not match the requested lock ID, ignore the request. This ensures that
+ // internal releases of locks that were never acquired by the client-side primitive do not cause
+ // legitimate locks to be unlocked.
+ if (lock.id != commit.value().id()) {
+ return;
+ }
+
+ // The lock has been released. Populate the lock from the queue.
lock = queue.poll();
while (lock != null) {
+ // If the waiter has a lock timer, cancel the timer.
Scheduled timer = timers.remove(lock.index);
if (timer != null) {
timer.cancel();
}
+ // If the lock session is for some reason inactive, continue on to the next waiter. Otherwise,
+ // publish a LOCKED event to the new lock holder's session.
RaftSession session = sessions().getSession(lock.session);
- if (session == null || session.getState() == RaftSession.State.EXPIRED
- || session.getState() == RaftSession.State.CLOSED) {
+ if (session == null || !session.getState().active()) {
lock = queue.poll();
} else {
session.publish(
- AtomixDistributedLockEvents.LOCK,
+ LOCKED,
SERIALIZER::encode,
new LockEvent(lock.id, commit.index()));
break;
@@ -177,29 +202,41 @@
}
}
+ /**
+ * Handles a session that has been closed by a client or expired by the cluster.
+ * <p>
+ * When a session is removed, if the session is the current lock holder then the lock is released and the next
+ * session waiting in the queue is granted the lock. Additionally, all pending lock requests for the session
+ * are removed from the lock queue.
+ *
+ * @param session the closed session
+ */
private void releaseSession(RaftSession session) {
+ // Remove all instances of the session from the lock queue.
+ queue.removeIf(lock -> lock.session == session.sessionId().id());
+
+ // If the removed session is the current holder of the lock, nullify the lock and attempt to grant it
+ // to the next waiter in the queue.
if (lock != null && lock.session == session.sessionId().id()) {
lock = queue.poll();
while (lock != null) {
- if (lock.session == session.sessionId().id()) {
+ // If the waiter has a lock timer, cancel the timer.
+ Scheduled timer = timers.remove(lock.index);
+ if (timer != null) {
+ timer.cancel();
+ }
+
+ // If the lock session is inactive, continue on to the next waiter. Otherwise,
+ // publish a LOCKED event to the new lock holder's session.
+ RaftSession lockSession = sessions().getSession(lock.session);
+ if (lockSession == null || !lockSession.getState().active()) {
lock = queue.poll();
} else {
- Scheduled timer = timers.remove(lock.index);
- if (timer != null) {
- timer.cancel();
- }
-
- RaftSession lockSession = sessions().getSession(lock.session);
- if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED
- || lockSession.getState() == RaftSession.State.CLOSED) {
- lock = queue.poll();
- } else {
- lockSession.publish(
- AtomixDistributedLockEvents.LOCK,
- SERIALIZER::encode,
- new LockEvent(lock.id, lock.index));
- break;
- }
+ lockSession.publish(
+ LOCKED,
+ SERIALIZER::encode,
+ new LockEvent(lock.id, lock.index));
+ break;
}
}
}