blob: a2dab953b49fecfebe755e1f2595e50c78da3272 [file] [log] [blame]
Jordan Haltermana76f2312018-01-25 16:56:45 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.time.Duration;
19import java.util.ArrayDeque;
20import java.util.HashMap;
21import java.util.Map;
22import java.util.Queue;
23
24import io.atomix.protocols.raft.service.AbstractRaftService;
25import io.atomix.protocols.raft.service.Commit;
26import io.atomix.protocols.raft.service.RaftServiceExecutor;
27import io.atomix.protocols.raft.session.RaftSession;
28import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
29import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
30import io.atomix.utils.concurrent.Scheduled;
31import org.onlab.util.KryoNamespace;
32import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.Serializer;
34
35import static com.google.common.base.MoreObjects.toStringHelper;
36import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAIL;
37import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
38import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
39import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
40import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
41
42/**
43 * Raft atomic value service.
44 */
45public class AtomixDistributedLockService extends AbstractRaftService {
46 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
47 .register(KryoNamespaces.BASIC)
48 .register(AtomixDistributedLockOperations.NAMESPACE)
49 .register(AtomixDistributedLockEvents.NAMESPACE)
50 .register(LockHolder.class)
51 .register(ArrayDeque.class)
52 .build());
53
54 private LockHolder lock;
55 private Queue<LockHolder> queue = new ArrayDeque<>();
56 private final Map<Long, Scheduled> timers = new HashMap<>();
57
58 @Override
59 protected void configure(RaftServiceExecutor executor) {
60 executor.register(LOCK, SERIALIZER::decode, this::lock);
61 executor.register(UNLOCK, SERIALIZER::decode, this::unlock);
62 }
63
64 @Override
65 public void snapshot(SnapshotWriter writer) {
66 writer.writeObject(lock, SERIALIZER::encode);
67 writer.writeObject(queue, SERIALIZER::encode);
68 }
69
70 @Override
71 public void install(SnapshotReader reader) {
72 lock = reader.readObject(SERIALIZER::decode);
73 queue = reader.readObject(SERIALIZER::decode);
74 timers.values().forEach(Scheduled::cancel);
75 timers.clear();
76 for (LockHolder holder : queue) {
77 if (holder.expire > 0) {
78 timers.put(holder.index,
79 scheduler().schedule(Duration.ofMillis(holder.expire - wallClock().getTime().unixTimestamp()),
80 () -> {
81 timers.remove(holder.index);
82 queue.remove(holder);
83 RaftSession session = sessions().getSession(holder.session);
84 if (session != null && session.getState().active()) {
85 session.publish(FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
86 }
87 }));
88 }
89 }
90 }
91
92 @Override
93 public void onExpire(RaftSession session) {
94 releaseSession(session);
95 }
96
97 @Override
98 public void onClose(RaftSession session) {
99 releaseSession(session);
100 }
101
102 /**
103 * Applies a lock commit.
104 *
105 * @param commit the lock commit
106 */
107 protected void lock(Commit<Lock> commit) {
108 if (lock == null) {
109 lock = new LockHolder(
110 commit.value().id(),
111 commit.index(),
112 commit.session().sessionId().id(),
113 0);
114 commit.session().publish(
115 AtomixDistributedLockEvents.LOCK,
116 SERIALIZER::encode,
117 new LockEvent(commit.value().id(), commit.index()));
118 } else if (commit.value().timeout() == 0) {
119 commit.session().publish(FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
120 } else if (commit.value().timeout() > 0) {
121 LockHolder holder = new LockHolder(
122 commit.value().id(),
123 commit.index(),
124 commit.session().sessionId().id(),
125 wallClock().getTime().unixTimestamp() + commit.value().timeout());
126 queue.add(holder);
127 timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
128 timers.remove(commit.index());
129 queue.remove(holder);
130 if (commit.session().getState().active()) {
131 commit.session().publish(
132 FAIL,
133 SERIALIZER::encode,
134 new LockEvent(commit.value().id(), commit.index()));
135 }
136 }));
137 } else {
138 LockHolder holder = new LockHolder(
139 commit.value().id(),
140 commit.index(),
141 commit.session().sessionId().id(),
142 0);
143 queue.add(holder);
144 }
145 }
146
147 /**
148 * Applies an unlock commit.
149 *
150 * @param commit the unlock commit
151 */
152 protected void unlock(Commit<Unlock> commit) {
153 if (lock != null) {
154 if (lock.session != commit.session().sessionId().id()) {
155 return;
156 }
157
158 lock = queue.poll();
159 while (lock != null) {
160 Scheduled timer = timers.remove(lock.index);
161 if (timer != null) {
162 timer.cancel();
163 }
164
165 RaftSession session = sessions().getSession(lock.session);
166 if (session == null || session.getState() == RaftSession.State.EXPIRED
167 || session.getState() == RaftSession.State.CLOSED) {
168 lock = queue.poll();
169 } else {
170 session.publish(
171 AtomixDistributedLockEvents.LOCK,
172 SERIALIZER::encode,
173 new LockEvent(lock.id, commit.index()));
174 break;
175 }
176 }
177 }
178 }
179
180 private void releaseSession(RaftSession session) {
181 if (lock != null && lock.session == session.sessionId().id()) {
182 lock = queue.poll();
183 while (lock != null) {
184 if (lock.session == session.sessionId().id()) {
185 lock = queue.poll();
186 } else {
187 Scheduled timer = timers.remove(lock.index);
188 if (timer != null) {
189 timer.cancel();
190 }
191
192 RaftSession lockSession = sessions().getSession(lock.session);
193 if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED
194 || lockSession.getState() == RaftSession.State.CLOSED) {
195 lock = queue.poll();
196 } else {
197 lockSession.publish(
198 AtomixDistributedLockEvents.LOCK,
199 SERIALIZER::encode,
200 new LockEvent(lock.id, lock.index));
201 break;
202 }
203 }
204 }
205 }
206 }
207
208 private class LockHolder {
209 private final int id;
210 private final long index;
211 private final long session;
212 private final long expire;
213
214 public LockHolder(int id, long index, long session, long expire) {
215 this.id = id;
216 this.index = index;
217 this.session = session;
218 this.expire = expire;
219 }
220
221 @Override
222 public String toString() {
223 return toStringHelper(this)
224 .add("id", id)
225 .add("index", index)
226 .add("session", session)
227 .add("expire", expire)
228 .toString();
229 }
230 }
231}