Add distributed lock primitive

Change-Id: I60f4581d5acb5fc9b9a21d4191874bb56348af58
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
new file mode 100644
index 0000000..c5a4250
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.utils.concurrent.Futures;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDistributedLock;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Version;
+
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
+
+/**
+ * Atomix lock implementation.
+ */
+public class AtomixDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
+    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+        .register(KryoNamespaces.BASIC)
+        .register(AtomixDistributedLockOperations.NAMESPACE)
+        .register(AtomixDistributedLockEvents.NAMESPACE)
+        .build());
+
+    private final Map<Integer, CompletableFuture<Version>> futures = new ConcurrentHashMap<>();
+    private final AtomicInteger id = new AtomicInteger();
+    private int lock;
+
+    public AtomixDistributedLock(RaftProxy proxy) {
+        super(proxy);
+        proxy.addStateChangeListener(this::handleStateChange);
+        proxy.addEventListener(AtomixDistributedLockEvents.LOCK, SERIALIZER::decode, this::handleLocked);
+        proxy.addEventListener(AtomixDistributedLockEvents.FAIL, SERIALIZER::decode, this::handleFailed);
+    }
+
+    private void handleLocked(LockEvent event) {
+        CompletableFuture<Version> future = futures.remove(event.id());
+        if (future != null) {
+            this.lock = event.id();
+            future.complete(new Version(event.version()));
+        }
+    }
+
+    private void handleFailed(LockEvent event) {
+        CompletableFuture<Version> future = futures.remove(event.id());
+        if (future != null) {
+            future.complete(null);
+        }
+    }
+
+    private void handleStateChange(RaftProxy.State state) {
+        if (state != RaftProxy.State.CONNECTED) {
+            Iterator<Map.Entry<Integer, CompletableFuture<Version>>> iterator = futures.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Integer, CompletableFuture<Version>> entry = iterator.next();
+                entry.getValue().completeExceptionally(new StorageException.Unavailable());
+                proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(entry.getKey()));
+                iterator.remove();
+            }
+            lock = 0;
+        }
+    }
+
+    @Override
+    public CompletableFuture<Version> lock() {
+        RaftProxy.State state = proxy.getState();
+        if (state != RaftProxy.State.CONNECTED) {
+            return Futures.exceptionalFuture(new StorageException.Unavailable());
+        }
+
+        CompletableFuture<Version> future = new CompletableFuture<>();
+        int id = this.id.incrementAndGet();
+        futures.put(id, future);
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, -1)).whenComplete((result, error) -> {
+            if (error != null) {
+                futures.remove(id);
+                future.completeExceptionally(error);
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Optional<Version>> tryLock() {
+        RaftProxy.State state = proxy.getState();
+        if (state != RaftProxy.State.CONNECTED) {
+            return Futures.exceptionalFuture(new StorageException.Unavailable());
+        }
+
+        CompletableFuture<Version> future = new CompletableFuture<>();
+        int id = this.id.incrementAndGet();
+        futures.put(id, future);
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, 0)).whenComplete((result, error) -> {
+            if (error != null) {
+                futures.remove(id);
+                future.completeExceptionally(error);
+            }
+        });
+        return future.thenApply(Optional::ofNullable);
+    }
+
+    @Override
+    public CompletableFuture<Optional<Version>> tryLock(Duration timeout) {
+        RaftProxy.State state = proxy.getState();
+        if (state != RaftProxy.State.CONNECTED) {
+            return Futures.exceptionalFuture(new StorageException.Unavailable());
+        }
+
+        CompletableFuture<Version> future = new CompletableFuture<>();
+        int id = this.id.incrementAndGet();
+        futures.put(id, future);
+        proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, timeout.toMillis())).whenComplete((result, error) -> {
+            if (error != null) {
+                futures.remove(id);
+                future.completeExceptionally(error);
+            }
+        });
+        return future.thenApply(Optional::ofNullable);
+    }
+
+    @Override
+    public CompletableFuture<Void> unlock() {
+        int lock = this.lock;
+        this.lock = 0;
+        if (lock != 0) {
+            return proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock));
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    /**
+     * Closes the lock.
+     *
+     * @return a future to be completed once the lock has been closed
+     */
+    public CompletableFuture<Void> close() {
+        return proxy.close();
+    }
+}
\ No newline at end of file