| /* |
| * Copyright 2018-present Open Networking Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.onosproject.store.primitives.resources.impl; |
| |
| import java.time.Duration; |
| import java.util.ArrayDeque; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Queue; |
| |
| import io.atomix.protocols.raft.service.AbstractRaftService; |
| import io.atomix.protocols.raft.service.Commit; |
| import io.atomix.protocols.raft.service.RaftServiceExecutor; |
| import io.atomix.protocols.raft.session.RaftSession; |
| import io.atomix.protocols.raft.storage.snapshot.SnapshotReader; |
| import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter; |
| import io.atomix.utils.concurrent.Scheduled; |
| import org.onlab.util.KryoNamespace; |
| import org.onosproject.store.serializers.KryoNamespaces; |
| import org.onosproject.store.service.Serializer; |
| |
| import static com.google.common.base.MoreObjects.toStringHelper; |
| import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAIL; |
| import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK; |
| import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock; |
| import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK; |
| import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock; |
| |
| /** |
| * Raft atomic value service. |
| */ |
| public class AtomixDistributedLockService extends AbstractRaftService { |
| private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder() |
| .register(KryoNamespaces.BASIC) |
| .register(AtomixDistributedLockOperations.NAMESPACE) |
| .register(AtomixDistributedLockEvents.NAMESPACE) |
| .register(LockHolder.class) |
| .register(ArrayDeque.class) |
| .build()); |
| |
| private LockHolder lock; |
| private Queue<LockHolder> queue = new ArrayDeque<>(); |
| private final Map<Long, Scheduled> timers = new HashMap<>(); |
| |
| @Override |
| protected void configure(RaftServiceExecutor executor) { |
| executor.register(LOCK, SERIALIZER::decode, this::lock); |
| executor.register(UNLOCK, SERIALIZER::decode, this::unlock); |
| } |
| |
| @Override |
| public void snapshot(SnapshotWriter writer) { |
| writer.writeObject(lock, SERIALIZER::encode); |
| writer.writeObject(queue, SERIALIZER::encode); |
| } |
| |
| @Override |
| public void install(SnapshotReader reader) { |
| lock = reader.readObject(SERIALIZER::decode); |
| queue = reader.readObject(SERIALIZER::decode); |
| timers.values().forEach(Scheduled::cancel); |
| timers.clear(); |
| for (LockHolder holder : queue) { |
| if (holder.expire > 0) { |
| timers.put(holder.index, |
| scheduler().schedule(Duration.ofMillis(holder.expire - wallClock().getTime().unixTimestamp()), |
| () -> { |
| timers.remove(holder.index); |
| queue.remove(holder); |
| RaftSession session = sessions().getSession(holder.session); |
| if (session != null && session.getState().active()) { |
| session.publish(FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index)); |
| } |
| })); |
| } |
| } |
| } |
| |
| @Override |
| public void onExpire(RaftSession session) { |
| releaseSession(session); |
| } |
| |
| @Override |
| public void onClose(RaftSession session) { |
| releaseSession(session); |
| } |
| |
| /** |
| * Applies a lock commit. |
| * |
| * @param commit the lock commit |
| */ |
| protected void lock(Commit<Lock> commit) { |
| if (lock == null) { |
| lock = new LockHolder( |
| commit.value().id(), |
| commit.index(), |
| commit.session().sessionId().id(), |
| 0); |
| commit.session().publish( |
| AtomixDistributedLockEvents.LOCK, |
| SERIALIZER::encode, |
| new LockEvent(commit.value().id(), commit.index())); |
| } else if (commit.value().timeout() == 0) { |
| commit.session().publish(FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index())); |
| } else if (commit.value().timeout() > 0) { |
| LockHolder holder = new LockHolder( |
| commit.value().id(), |
| commit.index(), |
| commit.session().sessionId().id(), |
| wallClock().getTime().unixTimestamp() + commit.value().timeout()); |
| queue.add(holder); |
| timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> { |
| timers.remove(commit.index()); |
| queue.remove(holder); |
| if (commit.session().getState().active()) { |
| commit.session().publish( |
| FAIL, |
| SERIALIZER::encode, |
| new LockEvent(commit.value().id(), commit.index())); |
| } |
| })); |
| } else { |
| LockHolder holder = new LockHolder( |
| commit.value().id(), |
| commit.index(), |
| commit.session().sessionId().id(), |
| 0); |
| queue.add(holder); |
| } |
| } |
| |
| /** |
| * Applies an unlock commit. |
| * |
| * @param commit the unlock commit |
| */ |
| protected void unlock(Commit<Unlock> commit) { |
| if (lock != null) { |
| if (lock.session != commit.session().sessionId().id()) { |
| return; |
| } |
| |
| lock = queue.poll(); |
| while (lock != null) { |
| Scheduled timer = timers.remove(lock.index); |
| if (timer != null) { |
| timer.cancel(); |
| } |
| |
| RaftSession session = sessions().getSession(lock.session); |
| if (session == null || session.getState() == RaftSession.State.EXPIRED |
| || session.getState() == RaftSession.State.CLOSED) { |
| lock = queue.poll(); |
| } else { |
| session.publish( |
| AtomixDistributedLockEvents.LOCK, |
| SERIALIZER::encode, |
| new LockEvent(lock.id, commit.index())); |
| break; |
| } |
| } |
| } |
| } |
| |
| private void releaseSession(RaftSession session) { |
| if (lock != null && lock.session == session.sessionId().id()) { |
| lock = queue.poll(); |
| while (lock != null) { |
| if (lock.session == session.sessionId().id()) { |
| lock = queue.poll(); |
| } else { |
| Scheduled timer = timers.remove(lock.index); |
| if (timer != null) { |
| timer.cancel(); |
| } |
| |
| RaftSession lockSession = sessions().getSession(lock.session); |
| if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED |
| || lockSession.getState() == RaftSession.State.CLOSED) { |
| lock = queue.poll(); |
| } else { |
| lockSession.publish( |
| AtomixDistributedLockEvents.LOCK, |
| SERIALIZER::encode, |
| new LockEvent(lock.id, lock.index)); |
| break; |
| } |
| } |
| } |
| } |
| } |
| |
| private class LockHolder { |
| private final int id; |
| private final long index; |
| private final long session; |
| private final long expire; |
| |
| public LockHolder(int id, long index, long session, long expire) { |
| this.id = id; |
| this.index = index; |
| this.session = session; |
| this.expire = expire; |
| } |
| |
| @Override |
| public String toString() { |
| return toStringHelper(this) |
| .add("id", id) |
| .add("index", index) |
| .add("session", session) |
| .add("expire", expire) |
| .toString(); |
| } |
| } |
| } |