Add distributed lock primitive
Change-Id: I60f4581d5acb5fc9b9a21d4191874bb56348af58
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
new file mode 100644
index 0000000..a2dab95
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+
+import io.atomix.protocols.raft.service.AbstractRaftService;
+import io.atomix.protocols.raft.service.Commit;
+import io.atomix.protocols.raft.service.RaftServiceExecutor;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.utils.concurrent.Scheduled;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAIL;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
+
+/**
+ * Raft atomic value service.
+ */
+public class AtomixDistributedLockService extends AbstractRaftService {
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixDistributedLockOperations.NAMESPACE)
+ .register(AtomixDistributedLockEvents.NAMESPACE)
+ .register(LockHolder.class)
+ .register(ArrayDeque.class)
+ .build());
+
+ private LockHolder lock;
+ private Queue<LockHolder> queue = new ArrayDeque<>();
+ private final Map<Long, Scheduled> timers = new HashMap<>();
+
+ @Override
+ protected void configure(RaftServiceExecutor executor) {
+ executor.register(LOCK, SERIALIZER::decode, this::lock);
+ executor.register(UNLOCK, SERIALIZER::decode, this::unlock);
+ }
+
+ @Override
+ public void snapshot(SnapshotWriter writer) {
+ writer.writeObject(lock, SERIALIZER::encode);
+ writer.writeObject(queue, SERIALIZER::encode);
+ }
+
+ @Override
+ public void install(SnapshotReader reader) {
+ lock = reader.readObject(SERIALIZER::decode);
+ queue = reader.readObject(SERIALIZER::decode);
+ timers.values().forEach(Scheduled::cancel);
+ timers.clear();
+ for (LockHolder holder : queue) {
+ if (holder.expire > 0) {
+ timers.put(holder.index,
+ scheduler().schedule(Duration.ofMillis(holder.expire - wallClock().getTime().unixTimestamp()),
+ () -> {
+ timers.remove(holder.index);
+ queue.remove(holder);
+ RaftSession session = sessions().getSession(holder.session);
+ if (session != null && session.getState().active()) {
+ session.publish(FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
+ }
+ }));
+ }
+ }
+ }
+
+ @Override
+ public void onExpire(RaftSession session) {
+ releaseSession(session);
+ }
+
+ @Override
+ public void onClose(RaftSession session) {
+ releaseSession(session);
+ }
+
+ /**
+ * Applies a lock commit.
+ *
+ * @param commit the lock commit
+ */
+ protected void lock(Commit<Lock> commit) {
+ if (lock == null) {
+ lock = new LockHolder(
+ commit.value().id(),
+ commit.index(),
+ commit.session().sessionId().id(),
+ 0);
+ commit.session().publish(
+ AtomixDistributedLockEvents.LOCK,
+ SERIALIZER::encode,
+ new LockEvent(commit.value().id(), commit.index()));
+ } else if (commit.value().timeout() == 0) {
+ commit.session().publish(FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
+ } else if (commit.value().timeout() > 0) {
+ LockHolder holder = new LockHolder(
+ commit.value().id(),
+ commit.index(),
+ commit.session().sessionId().id(),
+ wallClock().getTime().unixTimestamp() + commit.value().timeout());
+ queue.add(holder);
+ timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
+ timers.remove(commit.index());
+ queue.remove(holder);
+ if (commit.session().getState().active()) {
+ commit.session().publish(
+ FAIL,
+ SERIALIZER::encode,
+ new LockEvent(commit.value().id(), commit.index()));
+ }
+ }));
+ } else {
+ LockHolder holder = new LockHolder(
+ commit.value().id(),
+ commit.index(),
+ commit.session().sessionId().id(),
+ 0);
+ queue.add(holder);
+ }
+ }
+
+ /**
+ * Applies an unlock commit.
+ *
+ * @param commit the unlock commit
+ */
+ protected void unlock(Commit<Unlock> commit) {
+ if (lock != null) {
+ if (lock.session != commit.session().sessionId().id()) {
+ return;
+ }
+
+ lock = queue.poll();
+ while (lock != null) {
+ Scheduled timer = timers.remove(lock.index);
+ if (timer != null) {
+ timer.cancel();
+ }
+
+ RaftSession session = sessions().getSession(lock.session);
+ if (session == null || session.getState() == RaftSession.State.EXPIRED
+ || session.getState() == RaftSession.State.CLOSED) {
+ lock = queue.poll();
+ } else {
+ session.publish(
+ AtomixDistributedLockEvents.LOCK,
+ SERIALIZER::encode,
+ new LockEvent(lock.id, commit.index()));
+ break;
+ }
+ }
+ }
+ }
+
+ private void releaseSession(RaftSession session) {
+ if (lock != null && lock.session == session.sessionId().id()) {
+ lock = queue.poll();
+ while (lock != null) {
+ if (lock.session == session.sessionId().id()) {
+ lock = queue.poll();
+ } else {
+ Scheduled timer = timers.remove(lock.index);
+ if (timer != null) {
+ timer.cancel();
+ }
+
+ RaftSession lockSession = sessions().getSession(lock.session);
+ if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED
+ || lockSession.getState() == RaftSession.State.CLOSED) {
+ lock = queue.poll();
+ } else {
+ lockSession.publish(
+ AtomixDistributedLockEvents.LOCK,
+ SERIALIZER::encode,
+ new LockEvent(lock.id, lock.index));
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ private class LockHolder {
+ private final int id;
+ private final long index;
+ private final long session;
+ private final long expire;
+
+ public LockHolder(int id, long index, long session, long expire) {
+ this.id = id;
+ this.index = index;
+ this.session = session;
+ this.expire = expire;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("id", id)
+ .add("index", index)
+ .add("session", session)
+ .add("expire", expire)
+ .toString();
+ }
+ }
+}
\ No newline at end of file