blob: c234be345a33dafcad65614efba536fc79e8c879 [file] [log] [blame]
Jordan Halterman47432582018-01-25 16:56:45 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.time.Duration;
Jordan Halterman47432582018-01-25 16:56:45 -080019import java.util.Map;
20import java.util.Optional;
21import java.util.concurrent.CompletableFuture;
Jordan Halterman49d956c2018-02-01 17:46:09 -080022import java.util.concurrent.Executor;
23import java.util.concurrent.ScheduledExecutorService;
24import java.util.concurrent.ScheduledFuture;
25import java.util.concurrent.TimeUnit;
Jordan Halterman47432582018-01-25 16:56:45 -080026import java.util.concurrent.atomic.AtomicInteger;
Jordan Halterman49d956c2018-02-01 17:46:09 -080027import java.util.function.Consumer;
Jordan Halterman47432582018-01-25 16:56:45 -080028
Jordan Halterman49d956c2018-02-01 17:46:09 -080029import com.google.common.collect.Maps;
Jordan Halterman47432582018-01-25 16:56:45 -080030import io.atomix.protocols.raft.proxy.RaftProxy;
Jordan Halterman47432582018-01-25 16:56:45 -080031import org.onlab.util.KryoNamespace;
Jordan Halterman49d956c2018-02-01 17:46:09 -080032import org.onlab.util.OrderedExecutor;
33import org.onlab.util.SharedScheduledExecutors;
Jordan Halterman47432582018-01-25 16:56:45 -080034import org.onosproject.store.serializers.KryoNamespaces;
35import org.onosproject.store.service.AsyncDistributedLock;
36import org.onosproject.store.service.Serializer;
Jordan Halterman47432582018-01-25 16:56:45 -080037import org.onosproject.store.service.Version;
38
Jordan Halterman49d956c2018-02-01 17:46:09 -080039import static org.onlab.util.Tools.orderedFuture;
40import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAILED;
41import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.LOCKED;
Jordan Halterman47432582018-01-25 16:56:45 -080042import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
43import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
44import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
45import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
46
47/**
48 * Atomix lock implementation.
Jordan Halterman49d956c2018-02-01 17:46:09 -080049 * <p>
50 * This {@link org.onosproject.store.service.DistributedLock} implementation uses a {@link RaftProxy} to interact
51 * with a {@link AtomixDistributedLockService} replicated state machine.
Jordan Halterman47432582018-01-25 16:56:45 -080052 */
53public class AtomixDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
54 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
55 .register(KryoNamespaces.BASIC)
56 .register(AtomixDistributedLockOperations.NAMESPACE)
57 .register(AtomixDistributedLockEvents.NAMESPACE)
58 .build());
59
Jordan Halterman49d956c2018-02-01 17:46:09 -080060 private final ScheduledExecutorService scheduledExecutor;
61 private final Executor orderedExecutor;
62 private final Map<Integer, LockAttempt> attempts = Maps.newConcurrentMap();
Jordan Halterman47432582018-01-25 16:56:45 -080063 private final AtomicInteger id = new AtomicInteger();
Jordan Halterman49d956c2018-02-01 17:46:09 -080064 private final AtomicInteger lock = new AtomicInteger();
Jordan Halterman47432582018-01-25 16:56:45 -080065
66 public AtomixDistributedLock(RaftProxy proxy) {
67 super(proxy);
Jordan Halterman49d956c2018-02-01 17:46:09 -080068 this.scheduledExecutor = SharedScheduledExecutors.getPoolThreadExecutor();
69 this.orderedExecutor = new OrderedExecutor(scheduledExecutor);
70 proxy.addEventListener(LOCKED, SERIALIZER::decode, this::handleLocked);
71 proxy.addEventListener(FAILED, SERIALIZER::decode, this::handleFailed);
Jordan Halterman47432582018-01-25 16:56:45 -080072 }
73
Jordan Halterman49d956c2018-02-01 17:46:09 -080074 /**
75 * Handles a {@code LOCKED} event.
76 *
77 * @param event the event to handle
78 */
Jordan Halterman47432582018-01-25 16:56:45 -080079 private void handleLocked(LockEvent event) {
Jordan Halterman49d956c2018-02-01 17:46:09 -080080 // Remove the LockAttempt from the attempts map and complete it with the lock version if it exists.
81 // If the attempt no longer exists, it likely was expired by a client-side timer.
82 LockAttempt attempt = attempts.remove(event.id());
83 if (attempt != null) {
84 attempt.complete(new Version(event.version()));
Jordan Halterman47432582018-01-25 16:56:45 -080085 }
86 }
87
Jordan Halterman49d956c2018-02-01 17:46:09 -080088 /**
89 * Handles a {@code FAILED} event.
90 *
91 * @param event the event to handle
92 */
Jordan Halterman47432582018-01-25 16:56:45 -080093 private void handleFailed(LockEvent event) {
Jordan Halterman49d956c2018-02-01 17:46:09 -080094 // Remove the LockAttempt from the attempts map and complete it with a null value if it exists.
95 // If the attempt no longer exists, it likely was expired by a client-side timer.
96 LockAttempt attempt = attempts.remove(event.id());
97 if (attempt != null) {
98 attempt.complete(null);
Jordan Halterman47432582018-01-25 16:56:45 -080099 }
100 }
101
102 @Override
103 public CompletableFuture<Version> lock() {
Jordan Halterman49d956c2018-02-01 17:46:09 -0800104 // Create and register a new attempt and invoke the LOCK operation on the replicated state machine.
105 LockAttempt attempt = new LockAttempt();
106 proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), -1)).whenComplete((result, error) -> {
Jordan Halterman47432582018-01-25 16:56:45 -0800107 if (error != null) {
Jordan Halterman49d956c2018-02-01 17:46:09 -0800108 attempt.completeExceptionally(error);
Jordan Halterman47432582018-01-25 16:56:45 -0800109 }
110 });
Jordan Halterman49d956c2018-02-01 17:46:09 -0800111
112 // Return an ordered future that can safely be blocked inside the executor thread.
113 return orderedFuture(attempt, orderedExecutor, scheduledExecutor);
Jordan Halterman47432582018-01-25 16:56:45 -0800114 }
115
116 @Override
117 public CompletableFuture<Optional<Version>> tryLock() {
Jordan Halterman49d956c2018-02-01 17:46:09 -0800118 // If the proxy is currently disconnected from the cluster, we can just fail the lock attempt here.
Jordan Halterman47432582018-01-25 16:56:45 -0800119 RaftProxy.State state = proxy.getState();
120 if (state != RaftProxy.State.CONNECTED) {
Jordan Halterman49d956c2018-02-01 17:46:09 -0800121 return CompletableFuture.completedFuture(Optional.empty());
Jordan Halterman47432582018-01-25 16:56:45 -0800122 }
123
Jordan Halterman49d956c2018-02-01 17:46:09 -0800124 // Create and register a new attempt and invoke the LOCK operation on teh replicated state machine with
125 // a 0 timeout. The timeout will cause the state machine to immediately reject the request if the lock is
126 // already owned by another process.
127 LockAttempt attempt = new LockAttempt();
128 proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), 0)).whenComplete((result, error) -> {
Jordan Halterman47432582018-01-25 16:56:45 -0800129 if (error != null) {
Jordan Halterman49d956c2018-02-01 17:46:09 -0800130 attempt.completeExceptionally(error);
Jordan Halterman47432582018-01-25 16:56:45 -0800131 }
132 });
Jordan Halterman49d956c2018-02-01 17:46:09 -0800133
134 // Return an ordered future that can safely be blocked inside the executor thread.
135 return orderedFuture(attempt, orderedExecutor, scheduledExecutor)
136 .thenApply(Optional::ofNullable);
Jordan Halterman47432582018-01-25 16:56:45 -0800137 }
138
139 @Override
140 public CompletableFuture<Optional<Version>> tryLock(Duration timeout) {
Jordan Halterman49d956c2018-02-01 17:46:09 -0800141 // Create a lock attempt with a client-side timeout and fail the lock if the timer expires.
142 // Because time does not progress at the same rate on different nodes, we can't guarantee that
143 // the lock won't be granted to this process after it's expired here. Thus, if this timer expires and
144 // we fail the lock on the client, we also still need to send an UNLOCK command to the cluster in case it's
145 // later granted by the cluster. Note that the semantics of the Raft client will guarantee this operation
146 // occurs after any prior LOCK attempt, and the Raft client will retry the UNLOCK request until successful.
147 // Additionally, sending the unique lock ID with the command ensures we won't accidentally unlock a different
148 // lock call also granted to this process.
149 LockAttempt attempt = new LockAttempt(timeout, a -> {
150 a.complete(null);
151 proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(a.id()));
Jordan Halterman47432582018-01-25 16:56:45 -0800152 });
Jordan Halterman49d956c2018-02-01 17:46:09 -0800153
154 // Invoke the LOCK operation on the replicated state machine with the given timeout. If the lock is currently
155 // held by another process, the state machine will add the attempt to a queue and publish a FAILED event if
156 // the timer expires before this process can be granted the lock. If the client cannot reach the Raft cluster,
157 // the client-side timer will expire the attempt.
158 proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), timeout.toMillis()))
159 .whenComplete((result, error) -> {
160 if (error != null) {
161 attempt.completeExceptionally(error);
162 }
163 });
164
165 // Return an ordered future that can safely be blocked inside the executor thread.
166 return orderedFuture(attempt, orderedExecutor, scheduledExecutor)
167 .thenApply(Optional::ofNullable);
Jordan Halterman47432582018-01-25 16:56:45 -0800168 }
169
170 @Override
171 public CompletableFuture<Void> unlock() {
Jordan Halterman49d956c2018-02-01 17:46:09 -0800172 // Use the current lock ID to ensure we only unlock the lock currently held by this process.
173 int lock = this.lock.getAndSet(0);
Jordan Halterman47432582018-01-25 16:56:45 -0800174 if (lock != 0) {
Jordan Halterman49d956c2018-02-01 17:46:09 -0800175 return orderedFuture(
176 proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock)),
177 orderedExecutor,
178 scheduledExecutor);
Jordan Halterman47432582018-01-25 16:56:45 -0800179 }
180 return CompletableFuture.completedFuture(null);
181 }
182
183 /**
184 * Closes the lock.
185 *
186 * @return a future to be completed once the lock has been closed
187 */
188 public CompletableFuture<Void> close() {
189 return proxy.close();
190 }
Jordan Halterman49d956c2018-02-01 17:46:09 -0800191
192 /**
193 * Lock attempt.
194 */
195 private class LockAttempt extends CompletableFuture<Version> {
196 private final int id;
197 private final ScheduledFuture<?> scheduledFuture;
198
199 LockAttempt() {
200 this(null, null);
201 }
202
203 LockAttempt(Duration duration, Consumer<LockAttempt> callback) {
204 this.id = AtomixDistributedLock.this.id.incrementAndGet();
205 this.scheduledFuture = duration != null && callback != null
206 ? scheduledExecutor.schedule(() -> callback.accept(this), duration.toMillis(), TimeUnit.MILLISECONDS)
207 : null;
208 attempts.put(id, this);
209 }
210
211 /**
212 * Returns the lock attempt ID.
213 *
214 * @return the lock attempt ID
215 */
216 int id() {
217 return id;
218 }
219
220 @Override
221 public boolean complete(Version version) {
222 if (isDone()) {
223 return super.complete(null);
224 }
225 cancel();
226 if (version != null) {
227 lock.set(id);
228 return super.complete(version);
229 } else {
230 return super.complete(null);
231 }
232 }
233
234 @Override
235 public boolean completeExceptionally(Throwable ex) {
236 cancel();
237 return super.completeExceptionally(ex);
238 }
239
240 private void cancel() {
241 if (scheduledFuture != null) {
242 scheduledFuture.cancel(false);
243 }
244 attempts.remove(id);
245 }
246 }
Jordan Halterman47432582018-01-25 16:56:45 -0800247}