blob: a2dab953b49fecfebe755e1f2595e50c78da3272 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.utils.concurrent.Scheduled;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import static com.google.common.base.MoreObjects.toStringHelper;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAIL;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
/**
* Raft atomic value service.
*/
public class AtomixDistributedLockService extends AbstractRaftService {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.register(AtomixDistributedLockOperations.NAMESPACE)
.register(AtomixDistributedLockEvents.NAMESPACE)
.register(LockHolder.class)
.register(ArrayDeque.class)
.build());
private LockHolder lock;
private Queue<LockHolder> queue = new ArrayDeque<>();
private final Map<Long, Scheduled> timers = new HashMap<>();
@Override
protected void configure(RaftServiceExecutor executor) {
executor.register(LOCK, SERIALIZER::decode, this::lock);
executor.register(UNLOCK, SERIALIZER::decode, this::unlock);
}
@Override
public void snapshot(SnapshotWriter writer) {
writer.writeObject(lock, SERIALIZER::encode);
writer.writeObject(queue, SERIALIZER::encode);
}
@Override
public void install(SnapshotReader reader) {
lock = reader.readObject(SERIALIZER::decode);
queue = reader.readObject(SERIALIZER::decode);
timers.values().forEach(Scheduled::cancel);
timers.clear();
for (LockHolder holder : queue) {
if (holder.expire > 0) {
timers.put(holder.index,
scheduler().schedule(Duration.ofMillis(holder.expire - wallClock().getTime().unixTimestamp()),
() -> {
timers.remove(holder.index);
queue.remove(holder);
RaftSession session = sessions().getSession(holder.session);
if (session != null && session.getState().active()) {
session.publish(FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
}
}));
}
}
}
@Override
public void onExpire(RaftSession session) {
releaseSession(session);
}
@Override
public void onClose(RaftSession session) {
releaseSession(session);
}
/**
* Applies a lock commit.
*
* @param commit the lock commit
*/
protected void lock(Commit<Lock> commit) {
if (lock == null) {
lock = new LockHolder(
commit.value().id(),
commit.index(),
commit.session().sessionId().id(),
0);
commit.session().publish(
AtomixDistributedLockEvents.LOCK,
SERIALIZER::encode,
new LockEvent(commit.value().id(), commit.index()));
} else if (commit.value().timeout() == 0) {
commit.session().publish(FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
} else if (commit.value().timeout() > 0) {
LockHolder holder = new LockHolder(
commit.value().id(),
commit.index(),
commit.session().sessionId().id(),
wallClock().getTime().unixTimestamp() + commit.value().timeout());
queue.add(holder);
timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
timers.remove(commit.index());
queue.remove(holder);
if (commit.session().getState().active()) {
commit.session().publish(
FAIL,
SERIALIZER::encode,
new LockEvent(commit.value().id(), commit.index()));
}
}));
} else {
LockHolder holder = new LockHolder(
commit.value().id(),
commit.index(),
commit.session().sessionId().id(),
0);
queue.add(holder);
}
}
/**
* Applies an unlock commit.
*
* @param commit the unlock commit
*/
protected void unlock(Commit<Unlock> commit) {
if (lock != null) {
if (lock.session != commit.session().sessionId().id()) {
return;
}
lock = queue.poll();
while (lock != null) {
Scheduled timer = timers.remove(lock.index);
if (timer != null) {
timer.cancel();
}
RaftSession session = sessions().getSession(lock.session);
if (session == null || session.getState() == RaftSession.State.EXPIRED
|| session.getState() == RaftSession.State.CLOSED) {
lock = queue.poll();
} else {
session.publish(
AtomixDistributedLockEvents.LOCK,
SERIALIZER::encode,
new LockEvent(lock.id, commit.index()));
break;
}
}
}
}
private void releaseSession(RaftSession session) {
if (lock != null && lock.session == session.sessionId().id()) {
lock = queue.poll();
while (lock != null) {
if (lock.session == session.sessionId().id()) {
lock = queue.poll();
} else {
Scheduled timer = timers.remove(lock.index);
if (timer != null) {
timer.cancel();
}
RaftSession lockSession = sessions().getSession(lock.session);
if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED
|| lockSession.getState() == RaftSession.State.CLOSED) {
lock = queue.poll();
} else {
lockSession.publish(
AtomixDistributedLockEvents.LOCK,
SERIALIZER::encode,
new LockEvent(lock.id, lock.index));
break;
}
}
}
}
}
private class LockHolder {
private final int id;
private final long index;
private final long session;
private final long expire;
public LockHolder(int id, long index, long session, long expire) {
this.id = id;
this.index = index;
this.session = session;
this.expire = expire;
}
@Override
public String toString() {
return toStringHelper(this)
.add("id", id)
.add("index", index)
.add("session", session)
.add("expire", expire)
.toString();
}
}
}