blob: c5a425006e208f0382a1bc037dcd3a6af2c86cd4 [file] [log] [blame]
Jordan Haltermana76f2312018-01-25 16:56:45 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.time.Duration;
19import java.util.Iterator;
20import java.util.Map;
21import java.util.Optional;
22import java.util.concurrent.CompletableFuture;
23import java.util.concurrent.ConcurrentHashMap;
24import java.util.concurrent.atomic.AtomicInteger;
25
26import io.atomix.protocols.raft.proxy.RaftProxy;
27import io.atomix.utils.concurrent.Futures;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.store.serializers.KryoNamespaces;
30import org.onosproject.store.service.AsyncDistributedLock;
31import org.onosproject.store.service.Serializer;
32import org.onosproject.store.service.StorageException;
33import org.onosproject.store.service.Version;
34
35import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
36import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
37import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
38import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
39
40/**
41 * Atomix lock implementation.
42 */
43public class AtomixDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
44 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
45 .register(KryoNamespaces.BASIC)
46 .register(AtomixDistributedLockOperations.NAMESPACE)
47 .register(AtomixDistributedLockEvents.NAMESPACE)
48 .build());
49
50 private final Map<Integer, CompletableFuture<Version>> futures = new ConcurrentHashMap<>();
51 private final AtomicInteger id = new AtomicInteger();
52 private int lock;
53
54 public AtomixDistributedLock(RaftProxy proxy) {
55 super(proxy);
56 proxy.addStateChangeListener(this::handleStateChange);
57 proxy.addEventListener(AtomixDistributedLockEvents.LOCK, SERIALIZER::decode, this::handleLocked);
58 proxy.addEventListener(AtomixDistributedLockEvents.FAIL, SERIALIZER::decode, this::handleFailed);
59 }
60
61 private void handleLocked(LockEvent event) {
62 CompletableFuture<Version> future = futures.remove(event.id());
63 if (future != null) {
64 this.lock = event.id();
65 future.complete(new Version(event.version()));
66 }
67 }
68
69 private void handleFailed(LockEvent event) {
70 CompletableFuture<Version> future = futures.remove(event.id());
71 if (future != null) {
72 future.complete(null);
73 }
74 }
75
76 private void handleStateChange(RaftProxy.State state) {
77 if (state != RaftProxy.State.CONNECTED) {
78 Iterator<Map.Entry<Integer, CompletableFuture<Version>>> iterator = futures.entrySet().iterator();
79 while (iterator.hasNext()) {
80 Map.Entry<Integer, CompletableFuture<Version>> entry = iterator.next();
81 entry.getValue().completeExceptionally(new StorageException.Unavailable());
82 proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(entry.getKey()));
83 iterator.remove();
84 }
85 lock = 0;
86 }
87 }
88
89 @Override
90 public CompletableFuture<Version> lock() {
91 RaftProxy.State state = proxy.getState();
92 if (state != RaftProxy.State.CONNECTED) {
93 return Futures.exceptionalFuture(new StorageException.Unavailable());
94 }
95
96 CompletableFuture<Version> future = new CompletableFuture<>();
97 int id = this.id.incrementAndGet();
98 futures.put(id, future);
99 proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, -1)).whenComplete((result, error) -> {
100 if (error != null) {
101 futures.remove(id);
102 future.completeExceptionally(error);
103 }
104 });
105 return future;
106 }
107
108 @Override
109 public CompletableFuture<Optional<Version>> tryLock() {
110 RaftProxy.State state = proxy.getState();
111 if (state != RaftProxy.State.CONNECTED) {
112 return Futures.exceptionalFuture(new StorageException.Unavailable());
113 }
114
115 CompletableFuture<Version> future = new CompletableFuture<>();
116 int id = this.id.incrementAndGet();
117 futures.put(id, future);
118 proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, 0)).whenComplete((result, error) -> {
119 if (error != null) {
120 futures.remove(id);
121 future.completeExceptionally(error);
122 }
123 });
124 return future.thenApply(Optional::ofNullable);
125 }
126
127 @Override
128 public CompletableFuture<Optional<Version>> tryLock(Duration timeout) {
129 RaftProxy.State state = proxy.getState();
130 if (state != RaftProxy.State.CONNECTED) {
131 return Futures.exceptionalFuture(new StorageException.Unavailable());
132 }
133
134 CompletableFuture<Version> future = new CompletableFuture<>();
135 int id = this.id.incrementAndGet();
136 futures.put(id, future);
137 proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, timeout.toMillis())).whenComplete((result, error) -> {
138 if (error != null) {
139 futures.remove(id);
140 future.completeExceptionally(error);
141 }
142 });
143 return future.thenApply(Optional::ofNullable);
144 }
145
146 @Override
147 public CompletableFuture<Void> unlock() {
148 int lock = this.lock;
149 this.lock = 0;
150 if (lock != 0) {
151 return proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock));
152 }
153 return CompletableFuture.completedFuture(null);
154 }
155
156 /**
157 * Closes the lock.
158 *
159 * @return a future to be completed once the lock has been closed
160 */
161 public CompletableFuture<Void> close() {
162 return proxy.close();
163 }
164}