Add distributed lock primitive
Change-Id: I60f4581d5acb5fc9b9a21d4191874bb56348af58
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
new file mode 100644
index 0000000..c5a4250
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.utils.concurrent.Futures;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDistributedLock;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Version;
+
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
+
+/**
+ * Atomix lock implementation.
+ */
+public class AtomixDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixDistributedLockOperations.NAMESPACE)
+ .register(AtomixDistributedLockEvents.NAMESPACE)
+ .build());
+
+ private final Map<Integer, CompletableFuture<Version>> futures = new ConcurrentHashMap<>();
+ private final AtomicInteger id = new AtomicInteger();
+ private int lock;
+
+ public AtomixDistributedLock(RaftProxy proxy) {
+ super(proxy);
+ proxy.addStateChangeListener(this::handleStateChange);
+ proxy.addEventListener(AtomixDistributedLockEvents.LOCK, SERIALIZER::decode, this::handleLocked);
+ proxy.addEventListener(AtomixDistributedLockEvents.FAIL, SERIALIZER::decode, this::handleFailed);
+ }
+
+ private void handleLocked(LockEvent event) {
+ CompletableFuture<Version> future = futures.remove(event.id());
+ if (future != null) {
+ this.lock = event.id();
+ future.complete(new Version(event.version()));
+ }
+ }
+
+ private void handleFailed(LockEvent event) {
+ CompletableFuture<Version> future = futures.remove(event.id());
+ if (future != null) {
+ future.complete(null);
+ }
+ }
+
+ private void handleStateChange(RaftProxy.State state) {
+ if (state != RaftProxy.State.CONNECTED) {
+ Iterator<Map.Entry<Integer, CompletableFuture<Version>>> iterator = futures.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, CompletableFuture<Version>> entry = iterator.next();
+ entry.getValue().completeExceptionally(new StorageException.Unavailable());
+ proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(entry.getKey()));
+ iterator.remove();
+ }
+ lock = 0;
+ }
+ }
+
+ @Override
+ public CompletableFuture<Version> lock() {
+ RaftProxy.State state = proxy.getState();
+ if (state != RaftProxy.State.CONNECTED) {
+ return Futures.exceptionalFuture(new StorageException.Unavailable());
+ }
+
+ CompletableFuture<Version> future = new CompletableFuture<>();
+ int id = this.id.incrementAndGet();
+ futures.put(id, future);
+ proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, -1)).whenComplete((result, error) -> {
+ if (error != null) {
+ futures.remove(id);
+ future.completeExceptionally(error);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Optional<Version>> tryLock() {
+ RaftProxy.State state = proxy.getState();
+ if (state != RaftProxy.State.CONNECTED) {
+ return Futures.exceptionalFuture(new StorageException.Unavailable());
+ }
+
+ CompletableFuture<Version> future = new CompletableFuture<>();
+ int id = this.id.incrementAndGet();
+ futures.put(id, future);
+ proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, 0)).whenComplete((result, error) -> {
+ if (error != null) {
+ futures.remove(id);
+ future.completeExceptionally(error);
+ }
+ });
+ return future.thenApply(Optional::ofNullable);
+ }
+
+ @Override
+ public CompletableFuture<Optional<Version>> tryLock(Duration timeout) {
+ RaftProxy.State state = proxy.getState();
+ if (state != RaftProxy.State.CONNECTED) {
+ return Futures.exceptionalFuture(new StorageException.Unavailable());
+ }
+
+ CompletableFuture<Version> future = new CompletableFuture<>();
+ int id = this.id.incrementAndGet();
+ futures.put(id, future);
+ proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, timeout.toMillis())).whenComplete((result, error) -> {
+ if (error != null) {
+ futures.remove(id);
+ future.completeExceptionally(error);
+ }
+ });
+ return future.thenApply(Optional::ofNullable);
+ }
+
+ @Override
+ public CompletableFuture<Void> unlock() {
+ int lock = this.lock;
+ this.lock = 0;
+ if (lock != 0) {
+ return proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock));
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /**
+ * Closes the lock.
+ *
+ * @return a future to be completed once the lock has been closed
+ */
+ public CompletableFuture<Void> close() {
+ return proxy.close();
+ }
+}
\ No newline at end of file