Add client-side timer for blocking on DistributedLock#tryLock(Duration) while partition is unavailable

Change-Id: Iccae74ff2139d83525a7ff185d916826014914aa
(cherry picked from commit 1242168c763bb3c8d915ca4f377abc4729879e93)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
index c5a4250..c234be3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
@@ -16,22 +16,29 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import java.time.Duration;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
+import com.google.common.collect.Maps;
 import io.atomix.protocols.raft.proxy.RaftProxy;
-import io.atomix.utils.concurrent.Futures;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.OrderedExecutor;
+import org.onlab.util.SharedScheduledExecutors;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncDistributedLock;
 import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageException;
 import org.onosproject.store.service.Version;
 
+import static org.onlab.util.Tools.orderedFuture;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAILED;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.LOCKED;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
 import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
@@ -39,6 +46,9 @@
 
 /**
  * Atomix lock implementation.
+ * <p>
+ * This {@link org.onosproject.store.service.DistributedLock} implementation uses a {@link RaftProxy} to interact
+ * with a {@link AtomixDistributedLockService} replicated state machine.
  */
 public class AtomixDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
     private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
@@ -47,108 +57,125 @@
         .register(AtomixDistributedLockEvents.NAMESPACE)
         .build());
 
-    private final Map<Integer, CompletableFuture<Version>> futures = new ConcurrentHashMap<>();
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Executor orderedExecutor;
+    private final Map<Integer, LockAttempt> attempts = Maps.newConcurrentMap();
     private final AtomicInteger id = new AtomicInteger();
-    private int lock;
+    private final AtomicInteger lock = new AtomicInteger();
 
     public AtomixDistributedLock(RaftProxy proxy) {
         super(proxy);
-        proxy.addStateChangeListener(this::handleStateChange);
-        proxy.addEventListener(AtomixDistributedLockEvents.LOCK, SERIALIZER::decode, this::handleLocked);
-        proxy.addEventListener(AtomixDistributedLockEvents.FAIL, SERIALIZER::decode, this::handleFailed);
+        this.scheduledExecutor = SharedScheduledExecutors.getPoolThreadExecutor();
+        this.orderedExecutor = new OrderedExecutor(scheduledExecutor);
+        proxy.addEventListener(LOCKED, SERIALIZER::decode, this::handleLocked);
+        proxy.addEventListener(FAILED, SERIALIZER::decode, this::handleFailed);
     }
 
+    /**
+     * Handles a {@code LOCKED} event.
+     *
+     * @param event the event to handle
+     */
     private void handleLocked(LockEvent event) {
-        CompletableFuture<Version> future = futures.remove(event.id());
-        if (future != null) {
-            this.lock = event.id();
-            future.complete(new Version(event.version()));
+        // Remove the LockAttempt from the attempts map and complete it with the lock version if it exists.
+        // If the attempt no longer exists, it likely was expired by a client-side timer.
+        LockAttempt attempt = attempts.remove(event.id());
+        if (attempt != null) {
+            attempt.complete(new Version(event.version()));
         }
     }
 
+    /**
+     * Handles a {@code FAILED} event.
+     *
+     * @param event the event to handle
+     */
     private void handleFailed(LockEvent event) {
-        CompletableFuture<Version> future = futures.remove(event.id());
-        if (future != null) {
-            future.complete(null);
-        }
-    }
-
-    private void handleStateChange(RaftProxy.State state) {
-        if (state != RaftProxy.State.CONNECTED) {
-            Iterator<Map.Entry<Integer, CompletableFuture<Version>>> iterator = futures.entrySet().iterator();
-            while (iterator.hasNext()) {
-                Map.Entry<Integer, CompletableFuture<Version>> entry = iterator.next();
-                entry.getValue().completeExceptionally(new StorageException.Unavailable());
-                proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(entry.getKey()));
-                iterator.remove();
-            }
-            lock = 0;
+        // Remove the LockAttempt from the attempts map and complete it with a null value if it exists.
+        // If the attempt no longer exists, it likely was expired by a client-side timer.
+        LockAttempt attempt = attempts.remove(event.id());
+        if (attempt != null) {
+            attempt.complete(null);
         }
     }
 
     @Override
     public CompletableFuture<Version> lock() {
-        RaftProxy.State state = proxy.getState();
-        if (state != RaftProxy.State.CONNECTED) {
-            return Futures.exceptionalFuture(new StorageException.Unavailable());
-        }
-
-        CompletableFuture<Version> future = new CompletableFuture<>();
-        int id = this.id.incrementAndGet();
-        futures.put(id, future);
-        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, -1)).whenComplete((result, error) -> {
+        // Create and register a new attempt and invoke the LOCK operation on the replicated state machine.
+        LockAttempt attempt = new LockAttempt();
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), -1)).whenComplete((result, error) -> {
             if (error != null) {
-                futures.remove(id);
-                future.completeExceptionally(error);
+                attempt.completeExceptionally(error);
             }
         });
-        return future;
+
+        // Return an ordered future that can safely be blocked inside the executor thread.
+        return orderedFuture(attempt, orderedExecutor, scheduledExecutor);
     }
 
     @Override
     public CompletableFuture<Optional<Version>> tryLock() {
+        // If the proxy is currently disconnected from the cluster, we can just fail the lock attempt here.
         RaftProxy.State state = proxy.getState();
         if (state != RaftProxy.State.CONNECTED) {
-            return Futures.exceptionalFuture(new StorageException.Unavailable());
+            return CompletableFuture.completedFuture(Optional.empty());
         }
 
-        CompletableFuture<Version> future = new CompletableFuture<>();
-        int id = this.id.incrementAndGet();
-        futures.put(id, future);
-        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, 0)).whenComplete((result, error) -> {
+        // Create and register a new attempt and invoke the LOCK operation on teh replicated state machine with
+        // a 0 timeout. The timeout will cause the state machine to immediately reject the request if the lock is
+        // already owned by another process.
+        LockAttempt attempt = new LockAttempt();
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), 0)).whenComplete((result, error) -> {
             if (error != null) {
-                futures.remove(id);
-                future.completeExceptionally(error);
+                attempt.completeExceptionally(error);
             }
         });
-        return future.thenApply(Optional::ofNullable);
+
+        // Return an ordered future that can safely be blocked inside the executor thread.
+        return orderedFuture(attempt, orderedExecutor, scheduledExecutor)
+            .thenApply(Optional::ofNullable);
     }
 
     @Override
     public CompletableFuture<Optional<Version>> tryLock(Duration timeout) {
-        RaftProxy.State state = proxy.getState();
-        if (state != RaftProxy.State.CONNECTED) {
-            return Futures.exceptionalFuture(new StorageException.Unavailable());
-        }
-
-        CompletableFuture<Version> future = new CompletableFuture<>();
-        int id = this.id.incrementAndGet();
-        futures.put(id, future);
-        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, timeout.toMillis())).whenComplete((result, error) -> {
-            if (error != null) {
-                futures.remove(id);
-                future.completeExceptionally(error);
-            }
+        // Create a lock attempt with a client-side timeout and fail the lock if the timer expires.
+        // Because time does not progress at the same rate on different nodes, we can't guarantee that
+        // the lock won't be granted to this process after it's expired here. Thus, if this timer expires and
+        // we fail the lock on the client, we also still need to send an UNLOCK command to the cluster in case it's
+        // later granted by the cluster. Note that the semantics of the Raft client will guarantee this operation
+        // occurs after any prior LOCK attempt, and the Raft client will retry the UNLOCK request until successful.
+        // Additionally, sending the unique lock ID with the command ensures we won't accidentally unlock a different
+        // lock call also granted to this process.
+        LockAttempt attempt = new LockAttempt(timeout, a -> {
+            a.complete(null);
+            proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(a.id()));
         });
-        return future.thenApply(Optional::ofNullable);
+
+        // Invoke the LOCK operation on the replicated state machine with the given timeout. If the lock is currently
+        // held by another process, the state machine will add the attempt to a queue and publish a FAILED event if
+        // the timer expires before this process can be granted the lock. If the client cannot reach the Raft cluster,
+        // the client-side timer will expire the attempt.
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), timeout.toMillis()))
+            .whenComplete((result, error) -> {
+                if (error != null) {
+                    attempt.completeExceptionally(error);
+                }
+            });
+
+        // Return an ordered future that can safely be blocked inside the executor thread.
+        return orderedFuture(attempt, orderedExecutor, scheduledExecutor)
+            .thenApply(Optional::ofNullable);
     }
 
     @Override
     public CompletableFuture<Void> unlock() {
-        int lock = this.lock;
-        this.lock = 0;
+        // Use the current lock ID to ensure we only unlock the lock currently held by this process.
+        int lock = this.lock.getAndSet(0);
         if (lock != 0) {
-            return proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock));
+            return orderedFuture(
+                proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock)),
+                orderedExecutor,
+                scheduledExecutor);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -161,4 +188,60 @@
     public CompletableFuture<Void> close() {
         return proxy.close();
     }
+
+    /**
+     * Lock attempt.
+     */
+    private class LockAttempt extends CompletableFuture<Version> {
+        private final int id;
+        private final ScheduledFuture<?> scheduledFuture;
+
+        LockAttempt() {
+            this(null, null);
+        }
+
+        LockAttempt(Duration duration, Consumer<LockAttempt> callback) {
+            this.id = AtomixDistributedLock.this.id.incrementAndGet();
+            this.scheduledFuture = duration != null && callback != null
+                ? scheduledExecutor.schedule(() -> callback.accept(this), duration.toMillis(), TimeUnit.MILLISECONDS)
+                : null;
+            attempts.put(id, this);
+        }
+
+        /**
+         * Returns the lock attempt ID.
+         *
+         * @return the lock attempt ID
+         */
+        int id() {
+            return id;
+        }
+
+        @Override
+        public boolean complete(Version version) {
+            if (isDone()) {
+                return super.complete(null);
+            }
+            cancel();
+            if (version != null) {
+                lock.set(id);
+                return super.complete(version);
+            } else {
+                return super.complete(null);
+            }
+        }
+
+        @Override
+        public boolean completeExceptionally(Throwable ex) {
+            cancel();
+            return super.completeExceptionally(ex);
+        }
+
+        private void cancel() {
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+            }
+            attempts.remove(id);
+        }
+    }
 }
\ No newline at end of file