Add distributed lock primitive

Change-Id: I60f4581d5acb5fc9b9a21d4191874bb56348af58
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
new file mode 100644
index 0000000..c5a4250
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.utils.concurrent.Futures;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDistributedLock;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Version;
+
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
+
+/**
+ * Atomix lock implementation.
+ */
+public class AtomixDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
+    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+        .register(KryoNamespaces.BASIC)
+        .register(AtomixDistributedLockOperations.NAMESPACE)
+        .register(AtomixDistributedLockEvents.NAMESPACE)
+        .build());
+
+    private final Map<Integer, CompletableFuture<Version>> futures = new ConcurrentHashMap<>();
+    private final AtomicInteger id = new AtomicInteger();
+    private int lock;
+
+    public AtomixDistributedLock(RaftProxy proxy) {
+        super(proxy);
+        proxy.addStateChangeListener(this::handleStateChange);
+        proxy.addEventListener(AtomixDistributedLockEvents.LOCK, SERIALIZER::decode, this::handleLocked);
+        proxy.addEventListener(AtomixDistributedLockEvents.FAIL, SERIALIZER::decode, this::handleFailed);
+    }
+
+    private void handleLocked(LockEvent event) {
+        CompletableFuture<Version> future = futures.remove(event.id());
+        if (future != null) {
+            this.lock = event.id();
+            future.complete(new Version(event.version()));
+        }
+    }
+
+    private void handleFailed(LockEvent event) {
+        CompletableFuture<Version> future = futures.remove(event.id());
+        if (future != null) {
+            future.complete(null);
+        }
+    }
+
+    private void handleStateChange(RaftProxy.State state) {
+        if (state != RaftProxy.State.CONNECTED) {
+            Iterator<Map.Entry<Integer, CompletableFuture<Version>>> iterator = futures.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Integer, CompletableFuture<Version>> entry = iterator.next();
+                entry.getValue().completeExceptionally(new StorageException.Unavailable());
+                proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(entry.getKey()));
+                iterator.remove();
+            }
+            lock = 0;
+        }
+    }
+
+    @Override
+    public CompletableFuture<Version> lock() {
+        RaftProxy.State state = proxy.getState();
+        if (state != RaftProxy.State.CONNECTED) {
+            return Futures.exceptionalFuture(new StorageException.Unavailable());
+        }
+
+        CompletableFuture<Version> future = new CompletableFuture<>();
+        int id = this.id.incrementAndGet();
+        futures.put(id, future);
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, -1)).whenComplete((result, error) -> {
+            if (error != null) {
+                futures.remove(id);
+                future.completeExceptionally(error);
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Optional<Version>> tryLock() {
+        RaftProxy.State state = proxy.getState();
+        if (state != RaftProxy.State.CONNECTED) {
+            return Futures.exceptionalFuture(new StorageException.Unavailable());
+        }
+
+        CompletableFuture<Version> future = new CompletableFuture<>();
+        int id = this.id.incrementAndGet();
+        futures.put(id, future);
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, 0)).whenComplete((result, error) -> {
+            if (error != null) {
+                futures.remove(id);
+                future.completeExceptionally(error);
+            }
+        });
+        return future.thenApply(Optional::ofNullable);
+    }
+
+    @Override
+    public CompletableFuture<Optional<Version>> tryLock(Duration timeout) {
+        RaftProxy.State state = proxy.getState();
+        if (state != RaftProxy.State.CONNECTED) {
+            return Futures.exceptionalFuture(new StorageException.Unavailable());
+        }
+
+        CompletableFuture<Version> future = new CompletableFuture<>();
+        int id = this.id.incrementAndGet();
+        futures.put(id, future);
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, timeout.toMillis())).whenComplete((result, error) -> {
+            if (error != null) {
+                futures.remove(id);
+                future.completeExceptionally(error);
+            }
+        });
+        return future.thenApply(Optional::ofNullable);
+    }
+
+    @Override
+    public CompletableFuture<Void> unlock() {
+        int lock = this.lock;
+        this.lock = 0;
+        if (lock != 0) {
+            return proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock));
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    /**
+     * Closes the lock.
+     *
+     * @return a future to be completed once the lock has been closed
+     */
+    public CompletableFuture<Void> close() {
+        return proxy.close();
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java
new file mode 100644
index 0000000..0ef9270
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.protocols.raft.event.EventType;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+
+/**
+ * Raft value events.
+ */
+public enum AtomixDistributedLockEvents implements EventType {
+    LOCK("lock"),
+    FAIL("fail");
+
+    private final String id;
+
+    AtomixDistributedLockEvents(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String id() {
+        return id;
+    }
+
+    public static final KryoNamespace NAMESPACE = KryoNamespace.newBuilder()
+        .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50)
+        .register(LockEvent.class)
+        .register(byte[].class)
+        .build(AtomixDistributedLockEvents.class.getSimpleName());
+
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockOperations.java
new file mode 100644
index 0000000..5cfbc84
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockOperations.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.protocols.raft.operation.OperationId;
+import io.atomix.protocols.raft.operation.OperationType;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * {@link org.onosproject.store.service.DistributedLock} operations.
+ * <p>
+ * WARNING: Do not refactor enum values. Only add to them.
+ * Changing values risk breaking the ability to backup/restore/upgrade clusters.
+ */
+public enum AtomixDistributedLockOperations implements OperationId {
+    LOCK(OperationType.COMMAND),
+    UNLOCK(OperationType.COMMAND);
+
+    private final OperationType type;
+
+    AtomixDistributedLockOperations(OperationType type) {
+        this.type = type;
+    }
+
+    @Override
+    public String id() {
+        return name();
+    }
+
+    @Override
+    public OperationType type() {
+        return type;
+    }
+
+    public static final KryoNamespace NAMESPACE = KryoNamespace.newBuilder()
+        .register(KryoNamespaces.BASIC)
+        .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+        .register(Lock.class)
+        .register(Unlock.class)
+        .build(AtomixDistributedLockOperations.class.getSimpleName());
+
+    /**
+     * Abstract lock operation.
+     */
+    public abstract static class LockOperation {
+        @Override
+        public String toString() {
+            return toStringHelper(this).toString();
+        }
+    }
+
+    /**
+     * Lock command.
+     */
+    public static class Lock extends LockOperation {
+        private final int id;
+        private final long timeout;
+
+        public Lock() {
+            this(0, 0);
+        }
+
+        public Lock(int id, long timeout) {
+            this.id = id;
+            this.timeout = timeout;
+        }
+
+        /**
+         * Returns the lock identifier.
+         *
+         * @return the lock identifier
+         */
+        public int id() {
+            return id;
+        }
+
+        /**
+         * Returns the lock attempt timeout.
+         *
+         * @return the lock attempt timeout
+         */
+        public long timeout() {
+            return timeout;
+        }
+
+        @Override
+        public String toString() {
+            return toStringHelper(this)
+                .add("id", id)
+                .add("timeout", timeout)
+                .toString();
+        }
+    }
+
+    /**
+     * Unlock command.
+     */
+    public static class Unlock extends LockOperation {
+        private final int id;
+
+        public Unlock(int id) {
+            this.id = id;
+        }
+
+        /**
+         * Returns the lock identifier.
+         *
+         * @return the lock identifier
+         */
+        public int id() {
+            return id;
+        }
+
+        @Override
+        public String toString() {
+            return toStringHelper(this)
+                .add("id", id)
+                .toString();
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
new file mode 100644
index 0000000..a2dab95
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+
+import io.atomix.protocols.raft.service.AbstractRaftService;
+import io.atomix.protocols.raft.service.Commit;
+import io.atomix.protocols.raft.service.RaftServiceExecutor;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.utils.concurrent.Scheduled;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAIL;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
+
+/**
+ * Raft atomic value service.
+ */
+public class AtomixDistributedLockService extends AbstractRaftService {
+    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+        .register(KryoNamespaces.BASIC)
+        .register(AtomixDistributedLockOperations.NAMESPACE)
+        .register(AtomixDistributedLockEvents.NAMESPACE)
+        .register(LockHolder.class)
+        .register(ArrayDeque.class)
+        .build());
+
+    private LockHolder lock;
+    private Queue<LockHolder> queue = new ArrayDeque<>();
+    private final Map<Long, Scheduled> timers = new HashMap<>();
+
+    @Override
+    protected void configure(RaftServiceExecutor executor) {
+        executor.register(LOCK, SERIALIZER::decode, this::lock);
+        executor.register(UNLOCK, SERIALIZER::decode, this::unlock);
+    }
+
+    @Override
+    public void snapshot(SnapshotWriter writer) {
+        writer.writeObject(lock, SERIALIZER::encode);
+        writer.writeObject(queue, SERIALIZER::encode);
+    }
+
+    @Override
+    public void install(SnapshotReader reader) {
+        lock = reader.readObject(SERIALIZER::decode);
+        queue = reader.readObject(SERIALIZER::decode);
+        timers.values().forEach(Scheduled::cancel);
+        timers.clear();
+        for (LockHolder holder : queue) {
+            if (holder.expire > 0) {
+                timers.put(holder.index,
+                    scheduler().schedule(Duration.ofMillis(holder.expire - wallClock().getTime().unixTimestamp()),
+                        () -> {
+                            timers.remove(holder.index);
+                            queue.remove(holder);
+                            RaftSession session = sessions().getSession(holder.session);
+                            if (session != null && session.getState().active()) {
+                                session.publish(FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
+                            }
+                        }));
+            }
+        }
+    }
+
+    @Override
+    public void onExpire(RaftSession session) {
+        releaseSession(session);
+    }
+
+    @Override
+    public void onClose(RaftSession session) {
+        releaseSession(session);
+    }
+
+    /**
+     * Applies a lock commit.
+     *
+     * @param commit the lock commit
+     */
+    protected void lock(Commit<Lock> commit) {
+        if (lock == null) {
+            lock = new LockHolder(
+                commit.value().id(),
+                commit.index(),
+                commit.session().sessionId().id(),
+                0);
+            commit.session().publish(
+                AtomixDistributedLockEvents.LOCK,
+                SERIALIZER::encode,
+                new LockEvent(commit.value().id(), commit.index()));
+        } else if (commit.value().timeout() == 0) {
+            commit.session().publish(FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
+        } else if (commit.value().timeout() > 0) {
+            LockHolder holder = new LockHolder(
+                commit.value().id(),
+                commit.index(),
+                commit.session().sessionId().id(),
+                wallClock().getTime().unixTimestamp() + commit.value().timeout());
+            queue.add(holder);
+            timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
+                timers.remove(commit.index());
+                queue.remove(holder);
+                if (commit.session().getState().active()) {
+                    commit.session().publish(
+                        FAIL,
+                        SERIALIZER::encode,
+                        new LockEvent(commit.value().id(), commit.index()));
+                }
+            }));
+        } else {
+            LockHolder holder = new LockHolder(
+                commit.value().id(),
+                commit.index(),
+                commit.session().sessionId().id(),
+                0);
+            queue.add(holder);
+        }
+    }
+
+    /**
+     * Applies an unlock commit.
+     *
+     * @param commit the unlock commit
+     */
+    protected void unlock(Commit<Unlock> commit) {
+        if (lock != null) {
+            if (lock.session != commit.session().sessionId().id()) {
+                return;
+            }
+
+            lock = queue.poll();
+            while (lock != null) {
+                Scheduled timer = timers.remove(lock.index);
+                if (timer != null) {
+                    timer.cancel();
+                }
+
+                RaftSession session = sessions().getSession(lock.session);
+                if (session == null || session.getState() == RaftSession.State.EXPIRED
+                    || session.getState() == RaftSession.State.CLOSED) {
+                    lock = queue.poll();
+                } else {
+                    session.publish(
+                        AtomixDistributedLockEvents.LOCK,
+                        SERIALIZER::encode,
+                        new LockEvent(lock.id, commit.index()));
+                    break;
+                }
+            }
+        }
+    }
+
+    private void releaseSession(RaftSession session) {
+        if (lock != null && lock.session == session.sessionId().id()) {
+            lock = queue.poll();
+            while (lock != null) {
+                if (lock.session == session.sessionId().id()) {
+                    lock = queue.poll();
+                } else {
+                    Scheduled timer = timers.remove(lock.index);
+                    if (timer != null) {
+                        timer.cancel();
+                    }
+
+                    RaftSession lockSession = sessions().getSession(lock.session);
+                    if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED
+                        || lockSession.getState() == RaftSession.State.CLOSED) {
+                        lock = queue.poll();
+                    } else {
+                        lockSession.publish(
+                            AtomixDistributedLockEvents.LOCK,
+                            SERIALIZER::encode,
+                            new LockEvent(lock.id, lock.index));
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    private class LockHolder {
+        private final int id;
+        private final long index;
+        private final long session;
+        private final long expire;
+
+        public LockHolder(int id, long index, long session, long expire) {
+            this.id = id;
+            this.index = index;
+            this.session = session;
+            this.expire = expire;
+        }
+
+        @Override
+        public String toString() {
+            return toStringHelper(this)
+                .add("id", id)
+                .add("index", index)
+                .add("session", session)
+                .add("expire", expire)
+                .toString();
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/LockEvent.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/LockEvent.java
new file mode 100644
index 0000000..cddb41f
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/LockEvent.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Locked event.
+ */
+public class LockEvent {
+  private final int id;
+  private final long version;
+
+  public LockEvent() {
+    this(0, 0);
+  }
+
+  public LockEvent(int id, long version) {
+    this.id = id;
+    this.version = version;
+  }
+
+  /**
+   * Returns the lock ID.
+   *
+   * @return The lock ID.
+   */
+  public int id() {
+    return id;
+  }
+
+  /**
+   * Returns the lock version.
+   *
+   * @return The lock version.
+   */
+  public long version() {
+    return version;
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this)
+        .add("id", id)
+        .add("version", version)
+        .toString();
+  }
+}