blob: e730ffda2ff85e845078af8af5f193b07b546c40 [file] [log] [blame]
Jordan Haltermana76f2312018-01-25 16:56:45 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.time.Duration;
19import java.util.ArrayDeque;
20import java.util.HashMap;
21import java.util.Map;
22import java.util.Queue;
23
24import io.atomix.protocols.raft.service.AbstractRaftService;
25import io.atomix.protocols.raft.service.Commit;
26import io.atomix.protocols.raft.service.RaftServiceExecutor;
27import io.atomix.protocols.raft.session.RaftSession;
28import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
29import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
30import io.atomix.utils.concurrent.Scheduled;
31import org.onlab.util.KryoNamespace;
32import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.Serializer;
34
35import static com.google.common.base.MoreObjects.toStringHelper;
Jordan Halterman12421682018-02-01 17:46:09 -080036import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAILED;
37import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.LOCKED;
Jordan Haltermana76f2312018-01-25 16:56:45 -080038import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
39import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
40import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
41import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
42
43/**
44 * Raft atomic value service.
45 */
46public class AtomixDistributedLockService extends AbstractRaftService {
47 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
48 .register(KryoNamespaces.BASIC)
49 .register(AtomixDistributedLockOperations.NAMESPACE)
50 .register(AtomixDistributedLockEvents.NAMESPACE)
51 .register(LockHolder.class)
52 .register(ArrayDeque.class)
53 .build());
54
55 private LockHolder lock;
56 private Queue<LockHolder> queue = new ArrayDeque<>();
57 private final Map<Long, Scheduled> timers = new HashMap<>();
58
59 @Override
60 protected void configure(RaftServiceExecutor executor) {
61 executor.register(LOCK, SERIALIZER::decode, this::lock);
62 executor.register(UNLOCK, SERIALIZER::decode, this::unlock);
63 }
64
65 @Override
66 public void snapshot(SnapshotWriter writer) {
67 writer.writeObject(lock, SERIALIZER::encode);
68 writer.writeObject(queue, SERIALIZER::encode);
69 }
70
71 @Override
72 public void install(SnapshotReader reader) {
73 lock = reader.readObject(SERIALIZER::decode);
74 queue = reader.readObject(SERIALIZER::decode);
Jordan Halterman12421682018-02-01 17:46:09 -080075
76 // After the snapshot is installed, we need to cancel any existing timers and schedule new ones based on the
77 // state provided by the snapshot.
Jordan Haltermana76f2312018-01-25 16:56:45 -080078 timers.values().forEach(Scheduled::cancel);
79 timers.clear();
80 for (LockHolder holder : queue) {
81 if (holder.expire > 0) {
82 timers.put(holder.index,
83 scheduler().schedule(Duration.ofMillis(holder.expire - wallClock().getTime().unixTimestamp()),
84 () -> {
85 timers.remove(holder.index);
86 queue.remove(holder);
87 RaftSession session = sessions().getSession(holder.session);
88 if (session != null && session.getState().active()) {
Jordan Halterman12421682018-02-01 17:46:09 -080089 session.publish(FAILED, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
Jordan Haltermana76f2312018-01-25 16:56:45 -080090 }
91 }));
92 }
93 }
94 }
95
96 @Override
97 public void onExpire(RaftSession session) {
98 releaseSession(session);
99 }
100
101 @Override
102 public void onClose(RaftSession session) {
103 releaseSession(session);
104 }
105
106 /**
107 * Applies a lock commit.
108 *
109 * @param commit the lock commit
110 */
111 protected void lock(Commit<Lock> commit) {
Jordan Halterman12421682018-02-01 17:46:09 -0800112 // If the lock is not already owned, immediately grant the lock to the requester.
113 // Note that we still have to publish an event to the session. The event is guaranteed to be received
114 // by the client-side primitive after the LOCK response.
Jordan Haltermana76f2312018-01-25 16:56:45 -0800115 if (lock == null) {
116 lock = new LockHolder(
117 commit.value().id(),
118 commit.index(),
119 commit.session().sessionId().id(),
120 0);
121 commit.session().publish(
Jordan Halterman12421682018-02-01 17:46:09 -0800122 LOCKED,
Jordan Haltermana76f2312018-01-25 16:56:45 -0800123 SERIALIZER::encode,
124 new LockEvent(commit.value().id(), commit.index()));
Jordan Halterman12421682018-02-01 17:46:09 -0800125 // If the timeout is 0, that indicates this is a tryLock request. Immediately fail the request.
Jordan Haltermana76f2312018-01-25 16:56:45 -0800126 } else if (commit.value().timeout() == 0) {
Jordan Halterman12421682018-02-01 17:46:09 -0800127 commit.session().publish(FAILED, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
128 // If a timeout exists, add the request to the queue and set a timer. Note that the lock request expiration
129 // time is based on the *state machine* time - not the system time - to ensure consistency across servers.
Jordan Haltermana76f2312018-01-25 16:56:45 -0800130 } else if (commit.value().timeout() > 0) {
131 LockHolder holder = new LockHolder(
132 commit.value().id(),
133 commit.index(),
134 commit.session().sessionId().id(),
135 wallClock().getTime().unixTimestamp() + commit.value().timeout());
136 queue.add(holder);
137 timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
Jordan Halterman12421682018-02-01 17:46:09 -0800138 // When the lock request timer expires, remove the request from the queue and publish a FAILED
139 // event to the session. Note that this timer is guaranteed to be executed in the same thread as the
140 // state machine commands, so there's no need to use a lock here.
Jordan Haltermana76f2312018-01-25 16:56:45 -0800141 timers.remove(commit.index());
142 queue.remove(holder);
143 if (commit.session().getState().active()) {
144 commit.session().publish(
Jordan Halterman12421682018-02-01 17:46:09 -0800145 FAILED,
Jordan Haltermana76f2312018-01-25 16:56:45 -0800146 SERIALIZER::encode,
147 new LockEvent(commit.value().id(), commit.index()));
148 }
149 }));
Jordan Halterman12421682018-02-01 17:46:09 -0800150 // If the lock is -1, just add the request to the queue with no expiration.
Jordan Haltermana76f2312018-01-25 16:56:45 -0800151 } else {
152 LockHolder holder = new LockHolder(
153 commit.value().id(),
154 commit.index(),
155 commit.session().sessionId().id(),
156 0);
157 queue.add(holder);
158 }
159 }
160
161 /**
162 * Applies an unlock commit.
163 *
164 * @param commit the unlock commit
165 */
166 protected void unlock(Commit<Unlock> commit) {
167 if (lock != null) {
Jordan Halterman12421682018-02-01 17:46:09 -0800168 // If the commit's session does not match the current lock holder, ignore the request.
Jordan Haltermana76f2312018-01-25 16:56:45 -0800169 if (lock.session != commit.session().sessionId().id()) {
170 return;
171 }
172
Jordan Halterman12421682018-02-01 17:46:09 -0800173 // If the current lock ID does not match the requested lock ID, ignore the request. This ensures that
174 // internal releases of locks that were never acquired by the client-side primitive do not cause
175 // legitimate locks to be unlocked.
176 if (lock.id != commit.value().id()) {
177 return;
178 }
179
180 // The lock has been released. Populate the lock from the queue.
Jordan Haltermana76f2312018-01-25 16:56:45 -0800181 lock = queue.poll();
182 while (lock != null) {
Jordan Halterman12421682018-02-01 17:46:09 -0800183 // If the waiter has a lock timer, cancel the timer.
Jordan Haltermana76f2312018-01-25 16:56:45 -0800184 Scheduled timer = timers.remove(lock.index);
185 if (timer != null) {
186 timer.cancel();
187 }
188
Jordan Halterman12421682018-02-01 17:46:09 -0800189 // If the lock session is for some reason inactive, continue on to the next waiter. Otherwise,
190 // publish a LOCKED event to the new lock holder's session.
Jordan Haltermana76f2312018-01-25 16:56:45 -0800191 RaftSession session = sessions().getSession(lock.session);
Jordan Halterman12421682018-02-01 17:46:09 -0800192 if (session == null || !session.getState().active()) {
Jordan Haltermana76f2312018-01-25 16:56:45 -0800193 lock = queue.poll();
194 } else {
195 session.publish(
Jordan Halterman12421682018-02-01 17:46:09 -0800196 LOCKED,
Jordan Haltermana76f2312018-01-25 16:56:45 -0800197 SERIALIZER::encode,
198 new LockEvent(lock.id, commit.index()));
199 break;
200 }
201 }
202 }
203 }
204
Jordan Halterman12421682018-02-01 17:46:09 -0800205 /**
206 * Handles a session that has been closed by a client or expired by the cluster.
207 * <p>
208 * When a session is removed, if the session is the current lock holder then the lock is released and the next
209 * session waiting in the queue is granted the lock. Additionally, all pending lock requests for the session
210 * are removed from the lock queue.
211 *
212 * @param session the closed session
213 */
Jordan Haltermana76f2312018-01-25 16:56:45 -0800214 private void releaseSession(RaftSession session) {
Jordan Halterman12421682018-02-01 17:46:09 -0800215 // Remove all instances of the session from the lock queue.
216 queue.removeIf(lock -> lock.session == session.sessionId().id());
217
218 // If the removed session is the current holder of the lock, nullify the lock and attempt to grant it
219 // to the next waiter in the queue.
Jordan Haltermana76f2312018-01-25 16:56:45 -0800220 if (lock != null && lock.session == session.sessionId().id()) {
221 lock = queue.poll();
222 while (lock != null) {
Jordan Halterman12421682018-02-01 17:46:09 -0800223 // If the waiter has a lock timer, cancel the timer.
224 Scheduled timer = timers.remove(lock.index);
225 if (timer != null) {
226 timer.cancel();
227 }
228
229 // If the lock session is inactive, continue on to the next waiter. Otherwise,
230 // publish a LOCKED event to the new lock holder's session.
231 RaftSession lockSession = sessions().getSession(lock.session);
232 if (lockSession == null || !lockSession.getState().active()) {
Jordan Haltermana76f2312018-01-25 16:56:45 -0800233 lock = queue.poll();
234 } else {
Jordan Halterman12421682018-02-01 17:46:09 -0800235 lockSession.publish(
236 LOCKED,
237 SERIALIZER::encode,
238 new LockEvent(lock.id, lock.index));
239 break;
Jordan Haltermana76f2312018-01-25 16:56:45 -0800240 }
241 }
242 }
243 }
244
245 private class LockHolder {
246 private final int id;
247 private final long index;
248 private final long session;
249 private final long expire;
250
251 public LockHolder(int id, long index, long session, long expire) {
252 this.id = id;
253 this.index = index;
254 this.session = session;
255 this.expire = expire;
256 }
257
258 @Override
259 public String toString() {
260 return toStringHelper(this)
261 .add("id", id)
262 .add("index", index)
263 .add("session", session)
264 .add("expire", expire)
265 .toString();
266 }
267 }
268}