Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2018-present Open Networking Foundation |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.primitives.resources.impl; |
| 17 | |
| 18 | import java.time.Duration; |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 19 | import java.util.Map; |
| 20 | import java.util.Optional; |
| 21 | import java.util.concurrent.CompletableFuture; |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 22 | import java.util.concurrent.Executor; |
| 23 | import java.util.concurrent.ScheduledExecutorService; |
| 24 | import java.util.concurrent.ScheduledFuture; |
| 25 | import java.util.concurrent.TimeUnit; |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 26 | import java.util.concurrent.atomic.AtomicInteger; |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 27 | import java.util.function.Consumer; |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 28 | |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 29 | import com.google.common.collect.Maps; |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 30 | import io.atomix.protocols.raft.proxy.RaftProxy; |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 31 | import org.onlab.util.KryoNamespace; |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 32 | import org.onlab.util.OrderedExecutor; |
| 33 | import org.onlab.util.SharedScheduledExecutors; |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 34 | import org.onosproject.store.serializers.KryoNamespaces; |
| 35 | import org.onosproject.store.service.AsyncDistributedLock; |
| 36 | import org.onosproject.store.service.Serializer; |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 37 | import org.onosproject.store.service.Version; |
| 38 | |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 39 | import static org.onlab.util.Tools.orderedFuture; |
| 40 | import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAILED; |
| 41 | import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.LOCKED; |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 42 | import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK; |
| 43 | import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock; |
| 44 | import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK; |
| 45 | import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock; |
| 46 | |
| 47 | /** |
| 48 | * Atomix lock implementation. |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 49 | * <p> |
| 50 | * This {@link org.onosproject.store.service.DistributedLock} implementation uses a {@link RaftProxy} to interact |
| 51 | * with a {@link AtomixDistributedLockService} replicated state machine. |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 52 | */ |
| 53 | public class AtomixDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock { |
| 54 | private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder() |
| 55 | .register(KryoNamespaces.BASIC) |
| 56 | .register(AtomixDistributedLockOperations.NAMESPACE) |
| 57 | .register(AtomixDistributedLockEvents.NAMESPACE) |
| 58 | .build()); |
| 59 | |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 60 | private final ScheduledExecutorService scheduledExecutor; |
| 61 | private final Executor orderedExecutor; |
| 62 | private final Map<Integer, LockAttempt> attempts = Maps.newConcurrentMap(); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 63 | private final AtomicInteger id = new AtomicInteger(); |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 64 | private final AtomicInteger lock = new AtomicInteger(); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 65 | |
| 66 | public AtomixDistributedLock(RaftProxy proxy) { |
| 67 | super(proxy); |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 68 | this.scheduledExecutor = SharedScheduledExecutors.getPoolThreadExecutor(); |
| 69 | this.orderedExecutor = new OrderedExecutor(scheduledExecutor); |
| 70 | proxy.addEventListener(LOCKED, SERIALIZER::decode, this::handleLocked); |
| 71 | proxy.addEventListener(FAILED, SERIALIZER::decode, this::handleFailed); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 72 | } |
| 73 | |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 74 | /** |
| 75 | * Handles a {@code LOCKED} event. |
| 76 | * |
| 77 | * @param event the event to handle |
| 78 | */ |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 79 | private void handleLocked(LockEvent event) { |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 80 | // Remove the LockAttempt from the attempts map and complete it with the lock version if it exists. |
| 81 | // If the attempt no longer exists, it likely was expired by a client-side timer. |
| 82 | LockAttempt attempt = attempts.remove(event.id()); |
| 83 | if (attempt != null) { |
| 84 | attempt.complete(new Version(event.version())); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 85 | } |
| 86 | } |
| 87 | |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 88 | /** |
| 89 | * Handles a {@code FAILED} event. |
| 90 | * |
| 91 | * @param event the event to handle |
| 92 | */ |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 93 | private void handleFailed(LockEvent event) { |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 94 | // Remove the LockAttempt from the attempts map and complete it with a null value if it exists. |
| 95 | // If the attempt no longer exists, it likely was expired by a client-side timer. |
| 96 | LockAttempt attempt = attempts.remove(event.id()); |
| 97 | if (attempt != null) { |
| 98 | attempt.complete(null); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 99 | } |
| 100 | } |
| 101 | |
| 102 | @Override |
| 103 | public CompletableFuture<Version> lock() { |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 104 | // Create and register a new attempt and invoke the LOCK operation on the replicated state machine. |
| 105 | LockAttempt attempt = new LockAttempt(); |
| 106 | proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), -1)).whenComplete((result, error) -> { |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 107 | if (error != null) { |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 108 | attempt.completeExceptionally(error); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 109 | } |
| 110 | }); |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 111 | |
| 112 | // Return an ordered future that can safely be blocked inside the executor thread. |
| 113 | return orderedFuture(attempt, orderedExecutor, scheduledExecutor); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 114 | } |
| 115 | |
| 116 | @Override |
| 117 | public CompletableFuture<Optional<Version>> tryLock() { |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 118 | // If the proxy is currently disconnected from the cluster, we can just fail the lock attempt here. |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 119 | RaftProxy.State state = proxy.getState(); |
| 120 | if (state != RaftProxy.State.CONNECTED) { |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 121 | return CompletableFuture.completedFuture(Optional.empty()); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 122 | } |
| 123 | |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 124 | // Create and register a new attempt and invoke the LOCK operation on teh replicated state machine with |
| 125 | // a 0 timeout. The timeout will cause the state machine to immediately reject the request if the lock is |
| 126 | // already owned by another process. |
| 127 | LockAttempt attempt = new LockAttempt(); |
| 128 | proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), 0)).whenComplete((result, error) -> { |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 129 | if (error != null) { |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 130 | attempt.completeExceptionally(error); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 131 | } |
| 132 | }); |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 133 | |
| 134 | // Return an ordered future that can safely be blocked inside the executor thread. |
| 135 | return orderedFuture(attempt, orderedExecutor, scheduledExecutor) |
| 136 | .thenApply(Optional::ofNullable); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 137 | } |
| 138 | |
| 139 | @Override |
| 140 | public CompletableFuture<Optional<Version>> tryLock(Duration timeout) { |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 141 | // Create a lock attempt with a client-side timeout and fail the lock if the timer expires. |
| 142 | // Because time does not progress at the same rate on different nodes, we can't guarantee that |
| 143 | // the lock won't be granted to this process after it's expired here. Thus, if this timer expires and |
| 144 | // we fail the lock on the client, we also still need to send an UNLOCK command to the cluster in case it's |
| 145 | // later granted by the cluster. Note that the semantics of the Raft client will guarantee this operation |
| 146 | // occurs after any prior LOCK attempt, and the Raft client will retry the UNLOCK request until successful. |
| 147 | // Additionally, sending the unique lock ID with the command ensures we won't accidentally unlock a different |
| 148 | // lock call also granted to this process. |
| 149 | LockAttempt attempt = new LockAttempt(timeout, a -> { |
| 150 | a.complete(null); |
| 151 | proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(a.id())); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 152 | }); |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 153 | |
| 154 | // Invoke the LOCK operation on the replicated state machine with the given timeout. If the lock is currently |
| 155 | // held by another process, the state machine will add the attempt to a queue and publish a FAILED event if |
| 156 | // the timer expires before this process can be granted the lock. If the client cannot reach the Raft cluster, |
| 157 | // the client-side timer will expire the attempt. |
| 158 | proxy.invoke(LOCK, SERIALIZER::encode, new Lock(attempt.id(), timeout.toMillis())) |
| 159 | .whenComplete((result, error) -> { |
| 160 | if (error != null) { |
| 161 | attempt.completeExceptionally(error); |
| 162 | } |
| 163 | }); |
| 164 | |
| 165 | // Return an ordered future that can safely be blocked inside the executor thread. |
| 166 | return orderedFuture(attempt, orderedExecutor, scheduledExecutor) |
| 167 | .thenApply(Optional::ofNullable); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 168 | } |
| 169 | |
| 170 | @Override |
| 171 | public CompletableFuture<Void> unlock() { |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 172 | // Use the current lock ID to ensure we only unlock the lock currently held by this process. |
| 173 | int lock = this.lock.getAndSet(0); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 174 | if (lock != 0) { |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 175 | return orderedFuture( |
| 176 | proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock)), |
| 177 | orderedExecutor, |
| 178 | scheduledExecutor); |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 179 | } |
| 180 | return CompletableFuture.completedFuture(null); |
| 181 | } |
| 182 | |
| 183 | /** |
| 184 | * Closes the lock. |
| 185 | * |
| 186 | * @return a future to be completed once the lock has been closed |
| 187 | */ |
| 188 | public CompletableFuture<Void> close() { |
| 189 | return proxy.close(); |
| 190 | } |
Jordan Halterman | 49d956c | 2018-02-01 17:46:09 -0800 | [diff] [blame] | 191 | |
| 192 | /** |
| 193 | * Lock attempt. |
| 194 | */ |
| 195 | private class LockAttempt extends CompletableFuture<Version> { |
| 196 | private final int id; |
| 197 | private final ScheduledFuture<?> scheduledFuture; |
| 198 | |
| 199 | LockAttempt() { |
| 200 | this(null, null); |
| 201 | } |
| 202 | |
| 203 | LockAttempt(Duration duration, Consumer<LockAttempt> callback) { |
| 204 | this.id = AtomixDistributedLock.this.id.incrementAndGet(); |
| 205 | this.scheduledFuture = duration != null && callback != null |
| 206 | ? scheduledExecutor.schedule(() -> callback.accept(this), duration.toMillis(), TimeUnit.MILLISECONDS) |
| 207 | : null; |
| 208 | attempts.put(id, this); |
| 209 | } |
| 210 | |
| 211 | /** |
| 212 | * Returns the lock attempt ID. |
| 213 | * |
| 214 | * @return the lock attempt ID |
| 215 | */ |
| 216 | int id() { |
| 217 | return id; |
| 218 | } |
| 219 | |
| 220 | @Override |
| 221 | public boolean complete(Version version) { |
| 222 | if (isDone()) { |
| 223 | return super.complete(null); |
| 224 | } |
| 225 | cancel(); |
| 226 | if (version != null) { |
| 227 | lock.set(id); |
| 228 | return super.complete(version); |
| 229 | } else { |
| 230 | return super.complete(null); |
| 231 | } |
| 232 | } |
| 233 | |
| 234 | @Override |
| 235 | public boolean completeExceptionally(Throwable ex) { |
| 236 | cancel(); |
| 237 | return super.completeExceptionally(ex); |
| 238 | } |
| 239 | |
| 240 | private void cancel() { |
| 241 | if (scheduledFuture != null) { |
| 242 | scheduledFuture.cancel(false); |
| 243 | } |
| 244 | attempts.remove(id); |
| 245 | } |
| 246 | } |
Jordan Halterman | 4743258 | 2018-01-25 16:56:45 -0800 | [diff] [blame] | 247 | } |