Add client-side timer for blocking on DistributedLock#tryLock(Duration) while partition is unavailable

Change-Id: Iccae74ff2139d83525a7ff185d916826014914aa
(cherry picked from commit 1242168c763bb3c8d915ca4f377abc4729879e93)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
index a2dab95..e730ffd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
@@ -33,7 +33,8 @@
 import org.onosproject.store.service.Serializer;
 
 import static com.google.common.base.MoreObjects.toStringHelper;
-import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAIL;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAILED;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.LOCKED;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
@@ -71,6 +72,9 @@
     public void install(SnapshotReader reader) {
         lock = reader.readObject(SERIALIZER::decode);
         queue = reader.readObject(SERIALIZER::decode);
+
+        // After the snapshot is installed, we need to cancel any existing timers and schedule new ones based on the
+        // state provided by the snapshot.
         timers.values().forEach(Scheduled::cancel);
         timers.clear();
         for (LockHolder holder : queue) {
@@ -82,7 +86,7 @@
                             queue.remove(holder);
                             RaftSession session = sessions().getSession(holder.session);
                             if (session != null && session.getState().active()) {
-                                session.publish(FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
+                                session.publish(FAILED, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
                             }
                         }));
             }
@@ -105,6 +109,9 @@
      * @param commit the lock commit
      */
     protected void lock(Commit<Lock> commit) {
+        // If the lock is not already owned, immediately grant the lock to the requester.
+        // Note that we still have to publish an event to the session. The event is guaranteed to be received
+        // by the client-side primitive after the LOCK response.
         if (lock == null) {
             lock = new LockHolder(
                 commit.value().id(),
@@ -112,11 +119,14 @@
                 commit.session().sessionId().id(),
                 0);
             commit.session().publish(
-                AtomixDistributedLockEvents.LOCK,
+                LOCKED,
                 SERIALIZER::encode,
                 new LockEvent(commit.value().id(), commit.index()));
+        // If the timeout is 0, that indicates this is a tryLock request. Immediately fail the request.
         } else if (commit.value().timeout() == 0) {
-            commit.session().publish(FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
+            commit.session().publish(FAILED, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
+        // If a timeout exists, add the request to the queue and set a timer. Note that the lock request expiration
+        // time is based on the *state machine* time - not the system time - to ensure consistency across servers.
         } else if (commit.value().timeout() > 0) {
             LockHolder holder = new LockHolder(
                 commit.value().id(),
@@ -125,15 +135,19 @@
                 wallClock().getTime().unixTimestamp() + commit.value().timeout());
             queue.add(holder);
             timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
+                // When the lock request timer expires, remove the request from the queue and publish a FAILED
+                // event to the session. Note that this timer is guaranteed to be executed in the same thread as the
+                // state machine commands, so there's no need to use a lock here.
                 timers.remove(commit.index());
                 queue.remove(holder);
                 if (commit.session().getState().active()) {
                     commit.session().publish(
-                        FAIL,
+                        FAILED,
                         SERIALIZER::encode,
                         new LockEvent(commit.value().id(), commit.index()));
                 }
             }));
+        // If the lock is -1, just add the request to the queue with no expiration.
         } else {
             LockHolder holder = new LockHolder(
                 commit.value().id(),
@@ -151,24 +165,35 @@
      */
     protected void unlock(Commit<Unlock> commit) {
         if (lock != null) {
+            // If the commit's session does not match the current lock holder, ignore the request.
             if (lock.session != commit.session().sessionId().id()) {
                 return;
             }
 
+            // If the current lock ID does not match the requested lock ID, ignore the request. This ensures that
+            // internal releases of locks that were never acquired by the client-side primitive do not cause
+            // legitimate locks to be unlocked.
+            if (lock.id != commit.value().id()) {
+                return;
+            }
+
+            // The lock has been released. Populate the lock from the queue.
             lock = queue.poll();
             while (lock != null) {
+                // If the waiter has a lock timer, cancel the timer.
                 Scheduled timer = timers.remove(lock.index);
                 if (timer != null) {
                     timer.cancel();
                 }
 
+                // If the lock session is for some reason inactive, continue on to the next waiter. Otherwise,
+                // publish a LOCKED event to the new lock holder's session.
                 RaftSession session = sessions().getSession(lock.session);
-                if (session == null || session.getState() == RaftSession.State.EXPIRED
-                    || session.getState() == RaftSession.State.CLOSED) {
+                if (session == null || !session.getState().active()) {
                     lock = queue.poll();
                 } else {
                     session.publish(
-                        AtomixDistributedLockEvents.LOCK,
+                        LOCKED,
                         SERIALIZER::encode,
                         new LockEvent(lock.id, commit.index()));
                     break;
@@ -177,29 +202,41 @@
         }
     }
 
+    /**
+     * Handles a session that has been closed by a client or expired by the cluster.
+     * <p>
+     * When a session is removed, if the session is the current lock holder then the lock is released and the next
+     * session waiting in the queue is granted the lock. Additionally, all pending lock requests for the session
+     * are removed from the lock queue.
+     *
+     * @param session the closed session
+     */
     private void releaseSession(RaftSession session) {
+        // Remove all instances of the session from the lock queue.
+        queue.removeIf(lock -> lock.session == session.sessionId().id());
+
+        // If the removed session is the current holder of the lock, nullify the lock and attempt to grant it
+        // to the next waiter in the queue.
         if (lock != null && lock.session == session.sessionId().id()) {
             lock = queue.poll();
             while (lock != null) {
-                if (lock.session == session.sessionId().id()) {
+                // If the waiter has a lock timer, cancel the timer.
+                Scheduled timer = timers.remove(lock.index);
+                if (timer != null) {
+                    timer.cancel();
+                }
+
+                // If the lock session is inactive, continue on to the next waiter. Otherwise,
+                // publish a LOCKED event to the new lock holder's session.
+                RaftSession lockSession = sessions().getSession(lock.session);
+                if (lockSession == null || !lockSession.getState().active()) {
                     lock = queue.poll();
                 } else {
-                    Scheduled timer = timers.remove(lock.index);
-                    if (timer != null) {
-                        timer.cancel();
-                    }
-
-                    RaftSession lockSession = sessions().getSession(lock.session);
-                    if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED
-                        || lockSession.getState() == RaftSession.State.CLOSED) {
-                        lock = queue.poll();
-                    } else {
-                        lockSession.publish(
-                            AtomixDistributedLockEvents.LOCK,
-                            SERIALIZER::encode,
-                            new LockEvent(lock.id, lock.index));
-                        break;
-                    }
+                    lockSession.publish(
+                        LOCKED,
+                        SERIALIZER::encode,
+                        new LockEvent(lock.id, lock.index));
+                    break;
                 }
             }
         }