blob: e730ffda2ff85e845078af8af5f193b07b546c40 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.utils.concurrent.Scheduled;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import static com.google.common.base.MoreObjects.toStringHelper;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAILED;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.LOCKED;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
/**
* Raft atomic value service.
*/
public class AtomixDistributedLockService extends AbstractRaftService {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.register(AtomixDistributedLockOperations.NAMESPACE)
.register(AtomixDistributedLockEvents.NAMESPACE)
.register(LockHolder.class)
.register(ArrayDeque.class)
.build());
private LockHolder lock;
private Queue<LockHolder> queue = new ArrayDeque<>();
private final Map<Long, Scheduled> timers = new HashMap<>();
@Override
protected void configure(RaftServiceExecutor executor) {
executor.register(LOCK, SERIALIZER::decode, this::lock);
executor.register(UNLOCK, SERIALIZER::decode, this::unlock);
}
@Override
public void snapshot(SnapshotWriter writer) {
writer.writeObject(lock, SERIALIZER::encode);
writer.writeObject(queue, SERIALIZER::encode);
}
@Override
public void install(SnapshotReader reader) {
lock = reader.readObject(SERIALIZER::decode);
queue = reader.readObject(SERIALIZER::decode);
// After the snapshot is installed, we need to cancel any existing timers and schedule new ones based on the
// state provided by the snapshot.
timers.values().forEach(Scheduled::cancel);
timers.clear();
for (LockHolder holder : queue) {
if (holder.expire > 0) {
timers.put(holder.index,
scheduler().schedule(Duration.ofMillis(holder.expire - wallClock().getTime().unixTimestamp()),
() -> {
timers.remove(holder.index);
queue.remove(holder);
RaftSession session = sessions().getSession(holder.session);
if (session != null && session.getState().active()) {
session.publish(FAILED, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
}
}));
}
}
}
@Override
public void onExpire(RaftSession session) {
releaseSession(session);
}
@Override
public void onClose(RaftSession session) {
releaseSession(session);
}
/**
* Applies a lock commit.
*
* @param commit the lock commit
*/
protected void lock(Commit<Lock> commit) {
// If the lock is not already owned, immediately grant the lock to the requester.
// Note that we still have to publish an event to the session. The event is guaranteed to be received
// by the client-side primitive after the LOCK response.
if (lock == null) {
lock = new LockHolder(
commit.value().id(),
commit.index(),
commit.session().sessionId().id(),
0);
commit.session().publish(
LOCKED,
SERIALIZER::encode,
new LockEvent(commit.value().id(), commit.index()));
// If the timeout is 0, that indicates this is a tryLock request. Immediately fail the request.
} else if (commit.value().timeout() == 0) {
commit.session().publish(FAILED, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
// If a timeout exists, add the request to the queue and set a timer. Note that the lock request expiration
// time is based on the *state machine* time - not the system time - to ensure consistency across servers.
} else if (commit.value().timeout() > 0) {
LockHolder holder = new LockHolder(
commit.value().id(),
commit.index(),
commit.session().sessionId().id(),
wallClock().getTime().unixTimestamp() + commit.value().timeout());
queue.add(holder);
timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
// When the lock request timer expires, remove the request from the queue and publish a FAILED
// event to the session. Note that this timer is guaranteed to be executed in the same thread as the
// state machine commands, so there's no need to use a lock here.
timers.remove(commit.index());
queue.remove(holder);
if (commit.session().getState().active()) {
commit.session().publish(
FAILED,
SERIALIZER::encode,
new LockEvent(commit.value().id(), commit.index()));
}
}));
// If the lock is -1, just add the request to the queue with no expiration.
} else {
LockHolder holder = new LockHolder(
commit.value().id(),
commit.index(),
commit.session().sessionId().id(),
0);
queue.add(holder);
}
}
/**
* Applies an unlock commit.
*
* @param commit the unlock commit
*/
protected void unlock(Commit<Unlock> commit) {
if (lock != null) {
// If the commit's session does not match the current lock holder, ignore the request.
if (lock.session != commit.session().sessionId().id()) {
return;
}
// If the current lock ID does not match the requested lock ID, ignore the request. This ensures that
// internal releases of locks that were never acquired by the client-side primitive do not cause
// legitimate locks to be unlocked.
if (lock.id != commit.value().id()) {
return;
}
// The lock has been released. Populate the lock from the queue.
lock = queue.poll();
while (lock != null) {
// If the waiter has a lock timer, cancel the timer.
Scheduled timer = timers.remove(lock.index);
if (timer != null) {
timer.cancel();
}
// If the lock session is for some reason inactive, continue on to the next waiter. Otherwise,
// publish a LOCKED event to the new lock holder's session.
RaftSession session = sessions().getSession(lock.session);
if (session == null || !session.getState().active()) {
lock = queue.poll();
} else {
session.publish(
LOCKED,
SERIALIZER::encode,
new LockEvent(lock.id, commit.index()));
break;
}
}
}
}
/**
* Handles a session that has been closed by a client or expired by the cluster.
* <p>
* When a session is removed, if the session is the current lock holder then the lock is released and the next
* session waiting in the queue is granted the lock. Additionally, all pending lock requests for the session
* are removed from the lock queue.
*
* @param session the closed session
*/
private void releaseSession(RaftSession session) {
// Remove all instances of the session from the lock queue.
queue.removeIf(lock -> lock.session == session.sessionId().id());
// If the removed session is the current holder of the lock, nullify the lock and attempt to grant it
// to the next waiter in the queue.
if (lock != null && lock.session == session.sessionId().id()) {
lock = queue.poll();
while (lock != null) {
// If the waiter has a lock timer, cancel the timer.
Scheduled timer = timers.remove(lock.index);
if (timer != null) {
timer.cancel();
}
// If the lock session is inactive, continue on to the next waiter. Otherwise,
// publish a LOCKED event to the new lock holder's session.
RaftSession lockSession = sessions().getSession(lock.session);
if (lockSession == null || !lockSession.getState().active()) {
lock = queue.poll();
} else {
lockSession.publish(
LOCKED,
SERIALIZER::encode,
new LockEvent(lock.id, lock.index));
break;
}
}
}
}
private class LockHolder {
private final int id;
private final long index;
private final long session;
private final long expire;
public LockHolder(int id, long index, long session, long expire) {
this.id = id;
this.index = index;
this.session = session;
this.expire = expire;
}
@Override
public String toString() {
return toStringHelper(this)
.add("id", id)
.add("index", index)
.add("session", session)
.add("expire", expire)
.toString();
}
}
}