Add client-side timer for blocking on DistributedLock#tryLock(Duration) while partition is unavailable

Change-Id: Iccae74ff2139d83525a7ff185d916826014914aa
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
index c5a4250..c234be3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
@@ -16,22 +16,29 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import java.time.Duration;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
+import com.google.common.collect.Maps;
 import io.atomix.protocols.raft.proxy.RaftProxy;
-import io.atomix.utils.concurrent.Futures;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.OrderedExecutor;
+import org.onlab.util.SharedScheduledExecutors;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncDistributedLock;
 import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageException;
 import org.onosproject.store.service.Version;
 
+import static org.onlab.util.Tools.orderedFuture;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAILED;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.LOCKED;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
@@ -39,6 +46,9 @@
 
 /**
  * Atomix lock implementation.
+ * <p>
+ * This {@link org.onosproject.store.service.DistributedLock} implementation uses a {@link RaftProxy} to interact
+ * with a {@link AtomixDistributedLockService} replicated state machine.
  */
 public class AtomixDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
     private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
@@ -47,108 +57,125 @@
         .register(AtomixDistributedLockEvents.NAMESPACE)
         .build());
 
-    private final Map<Integer, CompletableFuture<Version>> futures = new ConcurrentHashMap<>();
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Executor orderedExecutor;
+    private final Map<Integer, LockAttempt> attempts = Maps.newConcurrentMap();
     private final AtomicInteger id = new AtomicInteger();
-    private int lock;
+    private final AtomicInteger lock = new AtomicInteger();
 
     public AtomixDistributedLock(RaftProxy proxy) {
         super(proxy);
-        proxy.addStateChangeListener(this::handleStateChange);
-        proxy.addEventListener(AtomixDistributedLockEvents.LOCK, SERIALIZER::decode, this::handleLocked);
-        proxy.addEventListener(AtomixDistributedLockEvents.FAIL, SERIALIZER::decode, this::handleFailed);
+        this.scheduledExecutor = SharedScheduledExecutors.getPoolThreadExecutor();
+        this.orderedExecutor = new OrderedExecutor(scheduledExecutor);
+        proxy.addEventListener(LOCKED, SERIALIZER::decode, this::handleLocked);
+        proxy.addEventListener(FAILED, SERIALIZER::decode, this::handleFailed);
     }
 
+    /**
+     * Handles a {@code LOCKED} event.
+     *
+     * @param event the event to handle
+     */
     private void handleLocked(LockEvent event) {
-        CompletableFuture<Version> future = futures.remove(event.id());
-        if (future != null) {
-            this.lock = event.id();
-            future.complete(new Version(event.version()));
+        // Remove the LockAttempt from the attempts map and complete it with the lock version if it exists.
+        // If the attempt no longer exists, it likely was expired by a client-side timer.
+        LockAttempt attempt = attempts.remove(event.id());
+        if (attempt != null) {
+            attempt.complete(new Version(event.version()));
         }
     }
 
+    /**
+     * Handles a {@code FAILED} event.
+     *
+     * @param event the event to handle
+     */
     private void handleFailed(LockEvent event) {
-        CompletableFuture<Version> future = futures.remove(event.id());
-        if (future != null) {
-            future.complete(null);
-        }
-    }
-
-    private void handleStateChange(RaftProxy.State state) {
-        if (state != RaftProxy.State.CONNECTED) {
-            Iterator<Map.Entry<Integer, CompletableFuture<Version>>> iterator = futures.entrySet().iterator();
-            while (iterator.hasNext()) {
-                Map.Entry<Integer, CompletableFuture<Version>> entry = iterator.next();
-                entry.getValue().completeExceptionally(new StorageException.Unavailable());
-                proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(entry.getKey()));
-                iterator.remove();
-            }
-            lock = 0;
+        // Remove the LockAttempt from the attempts map and complete it with a null value if it exists.
+        // If the attempt no longer exists, it likely was expired by a client-side timer.
+        LockAttempt attempt = attempts.remove(event.id());
+        if (attempt != null) {
+            attempt.complete(null);
         }
     }
 
     @Override
     public CompletableFuture<Version> lock() {
-        RaftProxy.State state = proxy.getState();
-        if (state != RaftProxy.State.CONNECTED) {
-            return Futures.exceptionalFuture(new StorageException.Unavailable());
-        }
-
-        CompletableFuture<Version> future = new CompletableFuture<>();
-        int id = this.id.incrementAndGet();
-        futures.put(id, future);
-        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, -1)).whenComplete((result, error) -> {
+        // Create and register a new attempt and invoke the LOCK operation on the replicated state machine.
+        LockAttempt attempt = new LockAttempt();
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), -1)).whenComplete((result, error) -> {
             if (error != null) {
-                futures.remove(id);
-                future.completeExceptionally(error);
+                attempt.completeExceptionally(error);
             }
         });
-        return future;
+
+        // Return an ordered future that can safely be blocked inside the executor thread.
+        return orderedFuture(attempt, orderedExecutor, scheduledExecutor);
     }
 
     @Override
     public CompletableFuture<Optional<Version>> tryLock() {
+        // If the proxy is currently disconnected from the cluster, we can just fail the lock attempt here.
         RaftProxy.State state = proxy.getState();
         if (state != RaftProxy.State.CONNECTED) {
-            return Futures.exceptionalFuture(new StorageException.Unavailable());
+            return CompletableFuture.completedFuture(Optional.empty());
         }
 
-        CompletableFuture<Version> future = new CompletableFuture<>();
-        int id = this.id.incrementAndGet();
-        futures.put(id, future);
-        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, 0)).whenComplete((result, error) -> {
+        // Create and register a new attempt and invoke the LOCK operation on teh replicated state machine with
+        // a 0 timeout. The timeout will cause the state machine to immediately reject the request if the lock is
+        // already owned by another process.
+        LockAttempt attempt = new LockAttempt();
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), 0)).whenComplete((result, error) -> {
             if (error != null) {
-                futures.remove(id);
-                future.completeExceptionally(error);
+                attempt.completeExceptionally(error);
             }
         });
-        return future.thenApply(Optional::ofNullable);
+
+        // Return an ordered future that can safely be blocked inside the executor thread.
+        return orderedFuture(attempt, orderedExecutor, scheduledExecutor)
+            .thenApply(Optional::ofNullable);
     }
 
     @Override
     public CompletableFuture<Optional<Version>> tryLock(Duration timeout) {
-        RaftProxy.State state = proxy.getState();
-        if (state != RaftProxy.State.CONNECTED) {
-            return Futures.exceptionalFuture(new StorageException.Unavailable());
-        }
-
-        CompletableFuture<Version> future = new CompletableFuture<>();
-        int id = this.id.incrementAndGet();
-        futures.put(id, future);
-        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, timeout.toMillis())).whenComplete((result, error) -> {
-            if (error != null) {
-                futures.remove(id);
-                future.completeExceptionally(error);
-            }
+        // Create a lock attempt with a client-side timeout and fail the lock if the timer expires.
+        // Because time does not progress at the same rate on different nodes, we can't guarantee that
+        // the lock won't be granted to this process after it's expired here. Thus, if this timer expires and
+        // we fail the lock on the client, we also still need to send an UNLOCK command to the cluster in case it's
+        // later granted by the cluster. Note that the semantics of the Raft client will guarantee this operation
+        // occurs after any prior LOCK attempt, and the Raft client will retry the UNLOCK request until successful.
+        // Additionally, sending the unique lock ID with the command ensures we won't accidentally unlock a different
+        // lock call also granted to this process.
+        LockAttempt attempt = new LockAttempt(timeout, a -> {
+            a.complete(null);
+            proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(a.id()));
         });
-        return future.thenApply(Optional::ofNullable);
+
+        // Invoke the LOCK operation on the replicated state machine with the given timeout. If the lock is currently
+        // held by another process, the state machine will add the attempt to a queue and publish a FAILED event if
+        // the timer expires before this process can be granted the lock. If the client cannot reach the Raft cluster,
+        // the client-side timer will expire the attempt.
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), timeout.toMillis()))
+            .whenComplete((result, error) -> {
+                if (error != null) {
+                    attempt.completeExceptionally(error);
+                }
+            });
+
+        // Return an ordered future that can safely be blocked inside the executor thread.
+        return orderedFuture(attempt, orderedExecutor, scheduledExecutor)
+            .thenApply(Optional::ofNullable);
     }
 
     @Override
     public CompletableFuture<Void> unlock() {
-        int lock = this.lock;
-        this.lock = 0;
+        // Use the current lock ID to ensure we only unlock the lock currently held by this process.
+        int lock = this.lock.getAndSet(0);
         if (lock != 0) {
-            return proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock));
+            return orderedFuture(
+                proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock)),
+                orderedExecutor,
+                scheduledExecutor);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -161,4 +188,60 @@
     public CompletableFuture<Void> close() {
         return proxy.close();
     }
+
+    /**
+     * Lock attempt.
+     */
+    private class LockAttempt extends CompletableFuture<Version> {
+        private final int id;
+        private final ScheduledFuture<?> scheduledFuture;
+
+        LockAttempt() {
+            this(null, null);
+        }
+
+        LockAttempt(Duration duration, Consumer<LockAttempt> callback) {
+            this.id = AtomixDistributedLock.this.id.incrementAndGet();
+            this.scheduledFuture = duration != null && callback != null
+                ? scheduledExecutor.schedule(() -> callback.accept(this), duration.toMillis(), TimeUnit.MILLISECONDS)
+                : null;
+            attempts.put(id, this);
+        }
+
+        /**
+         * Returns the lock attempt ID.
+         *
+         * @return the lock attempt ID
+         */
+        int id() {
+            return id;
+        }
+
+        @Override
+        public boolean complete(Version version) {
+            if (isDone()) {
+                return super.complete(null);
+            }
+            cancel();
+            if (version != null) {
+                lock.set(id);
+                return super.complete(version);
+            } else {
+                return super.complete(null);
+            }
+        }
+
+        @Override
+        public boolean completeExceptionally(Throwable ex) {
+            cancel();
+            return super.completeExceptionally(ex);
+        }
+
+        private void cancel() {
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+            }
+            attempts.remove(id);
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java
index 0ef9270..e6d2b2a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java
@@ -23,8 +23,8 @@
  * Raft value events.
  */
 public enum AtomixDistributedLockEvents implements EventType {
-    LOCK("lock"),
-    FAIL("fail");
+    LOCKED("lock"),
+    FAILED("fail");
 
     private final String id;
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
index a2dab95..e730ffd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
@@ -33,7 +33,8 @@
 import org.onosproject.store.service.Serializer;
 
 import static com.google.common.base.MoreObjects.toStringHelper;
-import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAIL;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAILED;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.LOCKED;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
@@ -71,6 +72,9 @@
     public void install(SnapshotReader reader) {
         lock = reader.readObject(SERIALIZER::decode);
         queue = reader.readObject(SERIALIZER::decode);
+
+        // After the snapshot is installed, we need to cancel any existing timers and schedule new ones based on the
+        // state provided by the snapshot.
         timers.values().forEach(Scheduled::cancel);
         timers.clear();
         for (LockHolder holder : queue) {
@@ -82,7 +86,7 @@
                             queue.remove(holder);
                             RaftSession session = sessions().getSession(holder.session);
                             if (session != null && session.getState().active()) {
-                                session.publish(FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
+                                session.publish(FAILED, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
                             }
                         }));
             }
@@ -105,6 +109,9 @@
      * @param commit the lock commit
      */
     protected void lock(Commit<Lock> commit) {
+        // If the lock is not already owned, immediately grant the lock to the requester.
+        // Note that we still have to publish an event to the session. The event is guaranteed to be received
+        // by the client-side primitive after the LOCK response.
         if (lock == null) {
             lock = new LockHolder(
                 commit.value().id(),
@@ -112,11 +119,14 @@
                 commit.session().sessionId().id(),
                 0);
             commit.session().publish(
-                AtomixDistributedLockEvents.LOCK,
+                LOCKED,
                 SERIALIZER::encode,
                 new LockEvent(commit.value().id(), commit.index()));
+        // If the timeout is 0, that indicates this is a tryLock request. Immediately fail the request.
         } else if (commit.value().timeout() == 0) {
-            commit.session().publish(FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
+            commit.session().publish(FAILED, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
+        // If a timeout exists, add the request to the queue and set a timer. Note that the lock request expiration
+        // time is based on the *state machine* time - not the system time - to ensure consistency across servers.
         } else if (commit.value().timeout() > 0) {
             LockHolder holder = new LockHolder(
                 commit.value().id(),
@@ -125,15 +135,19 @@
                 wallClock().getTime().unixTimestamp() + commit.value().timeout());
             queue.add(holder);
             timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
+                // When the lock request timer expires, remove the request from the queue and publish a FAILED
+                // event to the session. Note that this timer is guaranteed to be executed in the same thread as the
+                // state machine commands, so there's no need to use a lock here.
                 timers.remove(commit.index());
                 queue.remove(holder);
                 if (commit.session().getState().active()) {
                     commit.session().publish(
-                        FAIL,
+                        FAILED,
                         SERIALIZER::encode,
                         new LockEvent(commit.value().id(), commit.index()));
                 }
             }));
+        // If the lock is -1, just add the request to the queue with no expiration.
         } else {
             LockHolder holder = new LockHolder(
                 commit.value().id(),
@@ -151,24 +165,35 @@
      */
     protected void unlock(Commit<Unlock> commit) {
         if (lock != null) {
+            // If the commit's session does not match the current lock holder, ignore the request.
             if (lock.session != commit.session().sessionId().id()) {
                 return;
             }
 
+            // If the current lock ID does not match the requested lock ID, ignore the request. This ensures that
+            // internal releases of locks that were never acquired by the client-side primitive do not cause
+            // legitimate locks to be unlocked.
+            if (lock.id != commit.value().id()) {
+                return;
+            }
+
+            // The lock has been released. Populate the lock from the queue.
             lock = queue.poll();
             while (lock != null) {
+                // If the waiter has a lock timer, cancel the timer.
                 Scheduled timer = timers.remove(lock.index);
                 if (timer != null) {
                     timer.cancel();
                 }
 
+                // If the lock session is for some reason inactive, continue on to the next waiter. Otherwise,
+                // publish a LOCKED event to the new lock holder's session.
                 RaftSession session = sessions().getSession(lock.session);
-                if (session == null || session.getState() == RaftSession.State.EXPIRED
-                    || session.getState() == RaftSession.State.CLOSED) {
+                if (session == null || !session.getState().active()) {
                     lock = queue.poll();
                 } else {
                     session.publish(
-                        AtomixDistributedLockEvents.LOCK,
+                        LOCKED,
                         SERIALIZER::encode,
                         new LockEvent(lock.id, commit.index()));
                     break;
@@ -177,29 +202,41 @@
         }
     }
 
+    /**
+     * Handles a session that has been closed by a client or expired by the cluster.
+     * <p>
+     * When a session is removed, if the session is the current lock holder then the lock is released and the next
+     * session waiting in the queue is granted the lock. Additionally, all pending lock requests for the session
+     * are removed from the lock queue.
+     *
+     * @param session the closed session
+     */
     private void releaseSession(RaftSession session) {
+        // Remove all instances of the session from the lock queue.
+        queue.removeIf(lock -> lock.session == session.sessionId().id());
+
+        // If the removed session is the current holder of the lock, nullify the lock and attempt to grant it
+        // to the next waiter in the queue.
         if (lock != null && lock.session == session.sessionId().id()) {
             lock = queue.poll();
             while (lock != null) {
-                if (lock.session == session.sessionId().id()) {
+                // If the waiter has a lock timer, cancel the timer.
+                Scheduled timer = timers.remove(lock.index);
+                if (timer != null) {
+                    timer.cancel();
+                }
+
+                // If the lock session is inactive, continue on to the next waiter. Otherwise,
+                // publish a LOCKED event to the new lock holder's session.
+                RaftSession lockSession = sessions().getSession(lock.session);
+                if (lockSession == null || !lockSession.getState().active()) {
                     lock = queue.poll();
                 } else {
-                    Scheduled timer = timers.remove(lock.index);
-                    if (timer != null) {
-                        timer.cancel();
-                    }
-
-                    RaftSession lockSession = sessions().getSession(lock.session);
-                    if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED
-                        || lockSession.getState() == RaftSession.State.CLOSED) {
-                        lock = queue.poll();
-                    } else {
-                        lockSession.publish(
-                            AtomixDistributedLockEvents.LOCK,
-                            SERIALIZER::encode,
-                            new LockEvent(lock.id, lock.index));
-                        break;
-                    }
+                    lockSession.publish(
+                        LOCKED,
+                        SERIALIZER::encode,
+                        new LockEvent(lock.id, lock.index));
+                    break;
                 }
             }
         }