Add distributed lock primitive
Change-Id: I60f4581d5acb5fc9b9a21d4191874bb56348af58
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultDistributedLock.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultDistributedLock.java
new file mode 100644
index 0000000..8e48771
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultDistributedLock.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.onosproject.store.service.AsyncDistributedLock;
+import org.onosproject.store.service.DistributedLock;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Synchronous;
+import org.onosproject.store.service.Version;
+
+/**
+ * Default implementation for a {@code DistributedLock} backed by a {@link AsyncDistributedLock}.
+ */
+public class DefaultDistributedLock extends Synchronous<AsyncDistributedLock> implements DistributedLock {
+
+ private final AsyncDistributedLock asyncLock;
+ private final long operationTimeoutMillis;
+
+ public DefaultDistributedLock(AsyncDistributedLock asyncLock, long operationTimeoutMillis) {
+ super(asyncLock);
+ this.asyncLock = asyncLock;
+ this.operationTimeoutMillis = operationTimeoutMillis;
+ }
+
+ @Override
+ public Version lock() {
+ return complete(asyncLock.lock());
+ }
+
+ @Override
+ public Optional<Version> tryLock() {
+ return complete(asyncLock.tryLock());
+ }
+
+ @Override
+ public Optional<Version> tryLock(Duration timeout) {
+ return complete(asyncLock.tryLock(timeout));
+ }
+
+ @Override
+ public void unlock() {
+ complete(asyncLock.unlock());
+ }
+
+ private <T> T complete(CompletableFuture<T> future) {
+ if (operationTimeoutMillis == -1) {
+ return future.join();
+ }
+ try {
+ return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new StorageException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new StorageException.Timeout();
+ } catch (ExecutionException e) {
+ throw new StorageException(e.getCause());
+ }
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
index 6b2a54c..4989ad0 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
@@ -24,6 +24,7 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDistributedLock;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
@@ -115,6 +116,14 @@
<E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer);
/**
+ * Creates a new {@code AsyncDistributedLock}.
+ *
+ * @param name lock name
+ * @return lock
+ */
+ AsyncDistributedLock newAsyncDistributedLock(String name);
+
+ /**
* Creates a new {@code AsyncLeaderElector}.
*
* @param name leader elector name
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncDistributedLock.java b/core/api/src/main/java/org/onosproject/store/service/AsyncDistributedLock.java
new file mode 100644
index 0000000..be35b5e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncDistributedLock.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import org.onosproject.store.primitives.DefaultDistributedLock;
+
+/**
+ * Asynchronous lock primitive.
+ */
+public interface AsyncDistributedLock extends DistributedPrimitive {
+ @Override
+ default Type primitiveType() {
+ return Type.LOCK;
+ }
+
+ /**
+ * Acquires the lock, blocking until it's available.
+ *
+ * @return future to be completed once the lock has been acquired
+ */
+ CompletableFuture<Version> lock();
+
+ /**
+ * Attempts to acquire the lock.
+ *
+ * @return future to be completed with a boolean indicating whether the lock was acquired
+ */
+ CompletableFuture<Optional<Version>> tryLock();
+
+ /**
+ * Attempts to acquire the lock for a specified amount of time.
+ *
+ * @param timeout the timeout after which to give up attempting to acquire the lock
+ * @return future to be completed with a boolean indicating whether the lock was acquired
+ */
+ CompletableFuture<Optional<Version>> tryLock(Duration timeout);
+
+ /**
+ * Unlocks the lock.
+ *
+ * @return future to be completed once the lock has been released
+ */
+ CompletableFuture<Void> unlock();
+
+ default DistributedLock asLock() {
+ return asLock(-1);
+ }
+
+ default DistributedLock asLock(long timeoutMillis) {
+ return new DefaultDistributedLock(this, timeoutMillis);
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedLock.java b/core/api/src/main/java/org/onosproject/store/service/DistributedLock.java
new file mode 100644
index 0000000..cce53ff
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedLock.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Asynchronous lock primitive.
+ */
+public interface DistributedLock extends DistributedPrimitive {
+ @Override
+ default Type primitiveType() {
+ return Type.LOCK;
+ }
+
+ /**
+ * Acquires the lock, blocking until it's available.
+ *
+ * @return the acquired lock version
+ */
+ Version lock();
+
+ /**
+ * Attempts to acquire the lock.
+ *
+ * @return indicates whether the lock was acquired
+ */
+ Optional<Version> tryLock();
+
+ /**
+ * Attempts to acquire the lock for a specified amount of time.
+ *
+ * @param timeout the timeout after which to give up attempting to acquire the lock
+ * @return indicates whether the lock was acquired
+ */
+ Optional<Version> tryLock(Duration timeout);
+
+ /**
+ * Unlocks the lock.
+ */
+ void unlock();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedLockBuilder.java b/core/api/src/main/java/org/onosproject/store/service/DistributedLockBuilder.java
new file mode 100644
index 0000000..7e61c6a
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedLockBuilder.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
+
+/**
+ * Builder for DistributedLock.
+ */
+public abstract class DistributedLockBuilder
+ extends DistributedPrimitiveBuilder<DistributedLockBuilder, AsyncDistributedLock> {
+ public DistributedLockBuilder() {
+ super(DistributedPrimitive.Type.LOCK);
+ }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
index 8a440f3..620e16d 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
@@ -99,7 +99,12 @@
/**
* Transaction Context.
*/
- TRANSACTION_CONTEXT
+ TRANSACTION_CONTEXT,
+
+ /**
+ * Distributed lock.
+ */
+ LOCK,
}
/**
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index d863983..96e6037 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -107,6 +107,13 @@
<V> AtomicValueBuilder<V> atomicValueBuilder();
/**
+ * Creates a new DistributedLockBuilder.
+ *
+ * @return lock builder
+ */
+ DistributedLockBuilder lockBuilder();
+
+ /**
* Creates a new LeaderElectorBuilder.
*
* @return leader elector builder
diff --git a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
index 14850ce..4d528e2 100644
--- a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
@@ -60,6 +60,11 @@
}
@Override
+ public DistributedLockBuilder lockBuilder() {
+ return null;
+ }
+
+ @Override
public LeaderElectorBuilder leaderElectorBuilder() {
return null;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedLockBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedLockBuilder.java
new file mode 100644
index 0000000..fffebcc
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedLockBuilder.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.service.AsyncDistributedLock;
+import org.onosproject.store.service.DistributedLockBuilder;
+
+/**
+ * Default implementation of DistributedLockBuilder.
+ */
+public class DefaultDistributedLockBuilder extends DistributedLockBuilder {
+
+ private final DistributedPrimitiveCreator primitiveCreator;
+
+ public DefaultDistributedLockBuilder(DistributedPrimitiveCreator primitiveCreator) {
+ this.primitiveCreator = primitiveCreator;
+ }
+
+ @Override
+ public AsyncDistributedLock build() {
+ return primitiveCreator.newAsyncDistributedLock(name());
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index b7bb8f6..e80d1a3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -37,6 +37,7 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDistributedLock;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
@@ -123,6 +124,11 @@
}
@Override
+ public AsyncDistributedLock newAsyncDistributedLock(String name) {
+ return getCreator(name).newAsyncDistributedLock(name);
+ }
+
+ @Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
checkNotNull(name);
Map<PartitionId, AsyncLeaderElector> leaderElectors =
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 822641c..df54158 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -53,6 +53,7 @@
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMultimapBuilder;
import org.onosproject.store.service.ConsistentTreeMapBuilder;
+import org.onosproject.store.service.DistributedLockBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.DocumentTreeBuilder;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
@@ -191,6 +192,12 @@
}
@Override
+ public DistributedLockBuilder lockBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultDistributedLockBuilder(federatedPrimitiveCreator);
+ }
+
+ @Override
public LeaderElectorBuilder leaderElectorBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultLeaderElectorBuilder(federatedPrimitiveCreator);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 842e047..cd2dd75 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -39,6 +39,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapService;
import org.onosproject.store.primitives.resources.impl.AtomixCounterService;
+import org.onosproject.store.primitives.resources.impl.AtomixDistributedLockService;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeService;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorService;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueService;
@@ -75,6 +76,7 @@
() -> new AtomixDocumentTreeService(Ordering.NATURAL))
.put(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), Ordering.INSERTION),
() -> new AtomixDocumentTreeService(Ordering.INSERTION))
+ .put(DistributedPrimitive.Type.LOCK.name(), AtomixDistributedLockService::new)
.build();
public StoragePartition(Partition partition,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 999e694..f0c61c2c1d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -34,6 +34,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimap;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMap;
import org.onosproject.store.primitives.resources.impl.AtomixCounter;
+import org.onosproject.store.primitives.resources.impl.AtomixDistributedLock;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTree;
import org.onosproject.store.primitives.resources.impl.AtomixIdGenerator;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
@@ -46,6 +47,7 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDistributedLock;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
@@ -260,6 +262,21 @@
}
@Override
+ public AsyncDistributedLock newAsyncDistributedLock(String name) {
+ return new AtomixDistributedLock(client.newProxyBuilder()
+ .withName(name)
+ .withServiceType(DistributedPrimitive.Type.LOCK.name())
+ .withReadConsistency(ReadConsistency.LINEARIZABLE)
+ .withCommunicationStrategy(CommunicationStrategy.LEADER)
+ .withMinTimeout(Duration.ofSeconds(1))
+ .withMaxTimeout(Duration.ofSeconds(5))
+ .withMaxRetries(MAX_RETRIES)
+ .build()
+ .open()
+ .join());
+ }
+
+ @Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
AtomixLeaderElector leaderElector = new AtomixLeaderElector(client.newProxyBuilder()
.withName(name)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
new file mode 100644
index 0000000..c5a4250
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLock.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.utils.concurrent.Futures;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDistributedLock;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Version;
+
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
+
+/**
+ * Atomix lock implementation.
+ */
+public class AtomixDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixDistributedLockOperations.NAMESPACE)
+ .register(AtomixDistributedLockEvents.NAMESPACE)
+ .build());
+
+ private final Map<Integer, CompletableFuture<Version>> futures = new ConcurrentHashMap<>();
+ private final AtomicInteger id = new AtomicInteger();
+ private int lock;
+
+ public AtomixDistributedLock(RaftProxy proxy) {
+ super(proxy);
+ proxy.addStateChangeListener(this::handleStateChange);
+ proxy.addEventListener(AtomixDistributedLockEvents.LOCK, SERIALIZER::decode, this::handleLocked);
+ proxy.addEventListener(AtomixDistributedLockEvents.FAIL, SERIALIZER::decode, this::handleFailed);
+ }
+
+ private void handleLocked(LockEvent event) {
+ CompletableFuture<Version> future = futures.remove(event.id());
+ if (future != null) {
+ this.lock = event.id();
+ future.complete(new Version(event.version()));
+ }
+ }
+
+ private void handleFailed(LockEvent event) {
+ CompletableFuture<Version> future = futures.remove(event.id());
+ if (future != null) {
+ future.complete(null);
+ }
+ }
+
+ private void handleStateChange(RaftProxy.State state) {
+ if (state != RaftProxy.State.CONNECTED) {
+ Iterator<Map.Entry<Integer, CompletableFuture<Version>>> iterator = futures.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, CompletableFuture<Version>> entry = iterator.next();
+ entry.getValue().completeExceptionally(new StorageException.Unavailable());
+ proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(entry.getKey()));
+ iterator.remove();
+ }
+ lock = 0;
+ }
+ }
+
+ @Override
+ public CompletableFuture<Version> lock() {
+ RaftProxy.State state = proxy.getState();
+ if (state != RaftProxy.State.CONNECTED) {
+ return Futures.exceptionalFuture(new StorageException.Unavailable());
+ }
+
+ CompletableFuture<Version> future = new CompletableFuture<>();
+ int id = this.id.incrementAndGet();
+ futures.put(id, future);
+ proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, -1)).whenComplete((result, error) -> {
+ if (error != null) {
+ futures.remove(id);
+ future.completeExceptionally(error);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Optional<Version>> tryLock() {
+ RaftProxy.State state = proxy.getState();
+ if (state != RaftProxy.State.CONNECTED) {
+ return Futures.exceptionalFuture(new StorageException.Unavailable());
+ }
+
+ CompletableFuture<Version> future = new CompletableFuture<>();
+ int id = this.id.incrementAndGet();
+ futures.put(id, future);
+ proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, 0)).whenComplete((result, error) -> {
+ if (error != null) {
+ futures.remove(id);
+ future.completeExceptionally(error);
+ }
+ });
+ return future.thenApply(Optional::ofNullable);
+ }
+
+ @Override
+ public CompletableFuture<Optional<Version>> tryLock(Duration timeout) {
+ RaftProxy.State state = proxy.getState();
+ if (state != RaftProxy.State.CONNECTED) {
+ return Futures.exceptionalFuture(new StorageException.Unavailable());
+ }
+
+ CompletableFuture<Version> future = new CompletableFuture<>();
+ int id = this.id.incrementAndGet();
+ futures.put(id, future);
+ proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, timeout.toMillis())).whenComplete((result, error) -> {
+ if (error != null) {
+ futures.remove(id);
+ future.completeExceptionally(error);
+ }
+ });
+ return future.thenApply(Optional::ofNullable);
+ }
+
+ @Override
+ public CompletableFuture<Void> unlock() {
+ int lock = this.lock;
+ this.lock = 0;
+ if (lock != 0) {
+ return proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock));
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /**
+ * Closes the lock.
+ *
+ * @return a future to be completed once the lock has been closed
+ */
+ public CompletableFuture<Void> close() {
+ return proxy.close();
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java
new file mode 100644
index 0000000..0ef9270
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockEvents.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.protocols.raft.event.EventType;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+
+/**
+ * Raft value events.
+ */
+public enum AtomixDistributedLockEvents implements EventType {
+ LOCK("lock"),
+ FAIL("fail");
+
+ private final String id;
+
+ AtomixDistributedLockEvents(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String id() {
+ return id;
+ }
+
+ public static final KryoNamespace NAMESPACE = KryoNamespace.newBuilder()
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50)
+ .register(LockEvent.class)
+ .register(byte[].class)
+ .build(AtomixDistributedLockEvents.class.getSimpleName());
+
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockOperations.java
new file mode 100644
index 0000000..5cfbc84
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockOperations.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.protocols.raft.operation.OperationId;
+import io.atomix.protocols.raft.operation.OperationType;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * {@link org.onosproject.store.service.DistributedLock} operations.
+ * <p>
+ * WARNING: Do not refactor enum values. Only add to them.
+ * Changing values risk breaking the ability to backup/restore/upgrade clusters.
+ */
+public enum AtomixDistributedLockOperations implements OperationId {
+ LOCK(OperationType.COMMAND),
+ UNLOCK(OperationType.COMMAND);
+
+ private final OperationType type;
+
+ AtomixDistributedLockOperations(OperationType type) {
+ this.type = type;
+ }
+
+ @Override
+ public String id() {
+ return name();
+ }
+
+ @Override
+ public OperationType type() {
+ return type;
+ }
+
+ public static final KryoNamespace NAMESPACE = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .register(Lock.class)
+ .register(Unlock.class)
+ .build(AtomixDistributedLockOperations.class.getSimpleName());
+
+ /**
+ * Abstract lock operation.
+ */
+ public abstract static class LockOperation {
+ @Override
+ public String toString() {
+ return toStringHelper(this).toString();
+ }
+ }
+
+ /**
+ * Lock command.
+ */
+ public static class Lock extends LockOperation {
+ private final int id;
+ private final long timeout;
+
+ public Lock() {
+ this(0, 0);
+ }
+
+ public Lock(int id, long timeout) {
+ this.id = id;
+ this.timeout = timeout;
+ }
+
+ /**
+ * Returns the lock identifier.
+ *
+ * @return the lock identifier
+ */
+ public int id() {
+ return id;
+ }
+
+ /**
+ * Returns the lock attempt timeout.
+ *
+ * @return the lock attempt timeout
+ */
+ public long timeout() {
+ return timeout;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("id", id)
+ .add("timeout", timeout)
+ .toString();
+ }
+ }
+
+ /**
+ * Unlock command.
+ */
+ public static class Unlock extends LockOperation {
+ private final int id;
+
+ public Unlock(int id) {
+ this.id = id;
+ }
+
+ /**
+ * Returns the lock identifier.
+ *
+ * @return the lock identifier
+ */
+ public int id() {
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("id", id)
+ .toString();
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
new file mode 100644
index 0000000..a2dab95
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockService.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+
+import io.atomix.protocols.raft.service.AbstractRaftService;
+import io.atomix.protocols.raft.service.Commit;
+import io.atomix.protocols.raft.service.RaftServiceExecutor;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.utils.concurrent.Scheduled;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAIL;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
+import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
+
+/**
+ * Raft atomic value service.
+ */
+public class AtomixDistributedLockService extends AbstractRaftService {
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixDistributedLockOperations.NAMESPACE)
+ .register(AtomixDistributedLockEvents.NAMESPACE)
+ .register(LockHolder.class)
+ .register(ArrayDeque.class)
+ .build());
+
+ private LockHolder lock;
+ private Queue<LockHolder> queue = new ArrayDeque<>();
+ private final Map<Long, Scheduled> timers = new HashMap<>();
+
+ @Override
+ protected void configure(RaftServiceExecutor executor) {
+ executor.register(LOCK, SERIALIZER::decode, this::lock);
+ executor.register(UNLOCK, SERIALIZER::decode, this::unlock);
+ }
+
+ @Override
+ public void snapshot(SnapshotWriter writer) {
+ writer.writeObject(lock, SERIALIZER::encode);
+ writer.writeObject(queue, SERIALIZER::encode);
+ }
+
+ @Override
+ public void install(SnapshotReader reader) {
+ lock = reader.readObject(SERIALIZER::decode);
+ queue = reader.readObject(SERIALIZER::decode);
+ timers.values().forEach(Scheduled::cancel);
+ timers.clear();
+ for (LockHolder holder : queue) {
+ if (holder.expire > 0) {
+ timers.put(holder.index,
+ scheduler().schedule(Duration.ofMillis(holder.expire - wallClock().getTime().unixTimestamp()),
+ () -> {
+ timers.remove(holder.index);
+ queue.remove(holder);
+ RaftSession session = sessions().getSession(holder.session);
+ if (session != null && session.getState().active()) {
+ session.publish(FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
+ }
+ }));
+ }
+ }
+ }
+
+ @Override
+ public void onExpire(RaftSession session) {
+ releaseSession(session);
+ }
+
+ @Override
+ public void onClose(RaftSession session) {
+ releaseSession(session);
+ }
+
+ /**
+ * Applies a lock commit.
+ *
+ * @param commit the lock commit
+ */
+ protected void lock(Commit<Lock> commit) {
+ if (lock == null) {
+ lock = new LockHolder(
+ commit.value().id(),
+ commit.index(),
+ commit.session().sessionId().id(),
+ 0);
+ commit.session().publish(
+ AtomixDistributedLockEvents.LOCK,
+ SERIALIZER::encode,
+ new LockEvent(commit.value().id(), commit.index()));
+ } else if (commit.value().timeout() == 0) {
+ commit.session().publish(FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
+ } else if (commit.value().timeout() > 0) {
+ LockHolder holder = new LockHolder(
+ commit.value().id(),
+ commit.index(),
+ commit.session().sessionId().id(),
+ wallClock().getTime().unixTimestamp() + commit.value().timeout());
+ queue.add(holder);
+ timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
+ timers.remove(commit.index());
+ queue.remove(holder);
+ if (commit.session().getState().active()) {
+ commit.session().publish(
+ FAIL,
+ SERIALIZER::encode,
+ new LockEvent(commit.value().id(), commit.index()));
+ }
+ }));
+ } else {
+ LockHolder holder = new LockHolder(
+ commit.value().id(),
+ commit.index(),
+ commit.session().sessionId().id(),
+ 0);
+ queue.add(holder);
+ }
+ }
+
+ /**
+ * Applies an unlock commit.
+ *
+ * @param commit the unlock commit
+ */
+ protected void unlock(Commit<Unlock> commit) {
+ if (lock != null) {
+ if (lock.session != commit.session().sessionId().id()) {
+ return;
+ }
+
+ lock = queue.poll();
+ while (lock != null) {
+ Scheduled timer = timers.remove(lock.index);
+ if (timer != null) {
+ timer.cancel();
+ }
+
+ RaftSession session = sessions().getSession(lock.session);
+ if (session == null || session.getState() == RaftSession.State.EXPIRED
+ || session.getState() == RaftSession.State.CLOSED) {
+ lock = queue.poll();
+ } else {
+ session.publish(
+ AtomixDistributedLockEvents.LOCK,
+ SERIALIZER::encode,
+ new LockEvent(lock.id, commit.index()));
+ break;
+ }
+ }
+ }
+ }
+
+ private void releaseSession(RaftSession session) {
+ if (lock != null && lock.session == session.sessionId().id()) {
+ lock = queue.poll();
+ while (lock != null) {
+ if (lock.session == session.sessionId().id()) {
+ lock = queue.poll();
+ } else {
+ Scheduled timer = timers.remove(lock.index);
+ if (timer != null) {
+ timer.cancel();
+ }
+
+ RaftSession lockSession = sessions().getSession(lock.session);
+ if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED
+ || lockSession.getState() == RaftSession.State.CLOSED) {
+ lock = queue.poll();
+ } else {
+ lockSession.publish(
+ AtomixDistributedLockEvents.LOCK,
+ SERIALIZER::encode,
+ new LockEvent(lock.id, lock.index));
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ private class LockHolder {
+ private final int id;
+ private final long index;
+ private final long session;
+ private final long expire;
+
+ public LockHolder(int id, long index, long session, long expire) {
+ this.id = id;
+ this.index = index;
+ this.session = session;
+ this.expire = expire;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("id", id)
+ .add("index", index)
+ .add("session", session)
+ .add("expire", expire)
+ .toString();
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/LockEvent.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/LockEvent.java
new file mode 100644
index 0000000..cddb41f
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/LockEvent.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Locked event.
+ */
+public class LockEvent {
+ private final int id;
+ private final long version;
+
+ public LockEvent() {
+ this(0, 0);
+ }
+
+ public LockEvent(int id, long version) {
+ this.id = id;
+ this.version = version;
+ }
+
+ /**
+ * Returns the lock ID.
+ *
+ * @return The lock ID.
+ */
+ public int id() {
+ return id;
+ }
+
+ /**
+ * Returns the lock version.
+ *
+ * @return The lock version.
+ */
+ public long version() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("id", id)
+ .add("version", version)
+ .toString();
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockServiceTest.java
new file mode 100644
index 0000000..69942a7
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockServiceTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.impl.RaftContext;
+import io.atomix.protocols.raft.operation.OperationType;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.ServiceType;
+import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
+import io.atomix.protocols.raft.session.SessionId;
+import io.atomix.protocols.raft.session.impl.RaftSessionContext;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.protocols.raft.storage.snapshot.Snapshot;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.storage.StorageLevel;
+import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
+import io.atomix.utils.concurrent.ThreadContext;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.onosproject.store.service.DistributedPrimitive.Type.LEADER_ELECTOR;
+
+/**
+ * Distributed lock service test.
+ */
+public class AtomixDistributedLockServiceTest {
+ @Test
+ public void testSnapshot() throws Exception {
+ SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
+ .withPrefix("test")
+ .withStorageLevel(StorageLevel.MEMORY)
+ .build());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
+
+ AtomicLong index = new AtomicLong();
+ DefaultServiceContext context = mock(DefaultServiceContext.class);
+ expect(context.serviceType()).andReturn(ServiceType.from(LEADER_ELECTOR.name())).anyTimes();
+ expect(context.serviceName()).andReturn("test").anyTimes();
+ expect(context.serviceId()).andReturn(ServiceId.from(1)).anyTimes();
+ expect(context.executor()).andReturn(mock(ThreadContext.class)).anyTimes();
+ expect(context.currentIndex()).andReturn(index.get()).anyTimes();
+ expect(context.currentOperation()).andReturn(OperationType.COMMAND).anyTimes();
+
+ RaftContext server = mock(RaftContext.class);
+ expect(server.getProtocol()).andReturn(mock(RaftServerProtocol.class));
+
+ replay(context, server);
+
+ AtomixDistributedLockService service = new AtomixDistributedLockService();
+ service.init(context);
+
+ RaftSessionContext session = new RaftSessionContext(
+ SessionId.from(1),
+ MemberId.from("1"),
+ "test",
+ ServiceType.from(LEADER_ELECTOR.name()),
+ ReadConsistency.LINEARIZABLE,
+ 100,
+ 5000,
+ System.currentTimeMillis(),
+ context,
+ server,
+ new SingleThreadContextFactory(new AtomixThreadFactory()));
+ session.open();
+
+ service.lock(new DefaultCommit<>(
+ index.incrementAndGet(),
+ AtomixDistributedLockOperations.LOCK,
+ new AtomixDistributedLockOperations.Lock(1, 0),
+ session,
+ System.currentTimeMillis()));
+
+ try (SnapshotWriter writer = snapshot.openWriter()) {
+ service.snapshot(writer);
+ }
+
+ snapshot.complete();
+
+ service = new AtomixDistributedLockService();
+ try (SnapshotReader reader = snapshot.openReader()) {
+ service.install(reader);
+ }
+
+ service.unlock(new DefaultCommit<>(
+ index.incrementAndGet(),
+ AtomixDistributedLockOperations.UNLOCK,
+ new AtomixDistributedLockOperations.Unlock(1),
+ session,
+ System.currentTimeMillis()));
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockTest.java
new file mode 100644
index 0000000..cc32ba9
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockTest.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
+import org.junit.Test;
+import org.onosproject.store.service.Version;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Raft lock test.
+ */
+public class AtomixDistributedLockTest extends AtomixTestBase<AtomixDistributedLock> {
+ @Override
+ protected RaftService createService() {
+ return new AtomixDistributedLockService();
+ }
+
+ @Override
+ protected AtomixDistributedLock createPrimitive(RaftProxy proxy) {
+ return new AtomixDistributedLock(proxy);
+ }
+
+ /**
+ * Tests locking and unlocking a lock.
+ */
+ @Test
+ public void testLockUnlock() throws Throwable {
+ AtomixDistributedLock lock = newPrimitive("test-lock-unlock");
+ lock.lock().join();
+ lock.unlock().join();
+ }
+
+ /**
+ * Tests releasing a lock when the client's session is closed.
+ */
+ @Test
+ public void testReleaseOnClose() throws Throwable {
+ AtomixDistributedLock lock1 = newPrimitive("test-lock-release-on-close");
+ AtomixDistributedLock lock2 = newPrimitive("test-lock-release-on-close");
+ lock1.lock().join();
+ CompletableFuture<Version> future = lock2.lock();
+ lock1.close();
+ future.join();
+ }
+
+ /**
+ * Tests attempting to acquire a lock.
+ */
+ @Test
+ public void testTryLockFail() throws Throwable {
+ AtomixDistributedLock lock1 = newPrimitive("test-try-lock-fail");
+ AtomixDistributedLock lock2 = newPrimitive("test-try-lock-fail");
+
+ lock1.lock().join();
+
+ assertFalse(lock2.tryLock().join().isPresent());
+ }
+
+ /**
+ * Tests attempting to acquire a lock.
+ */
+ @Test
+ public void testTryLockSucceed() throws Throwable {
+ AtomixDistributedLock lock = newPrimitive("test-try-lock-succeed");
+ assertTrue(lock.tryLock().join().isPresent());
+ }
+
+ /**
+ * Tests attempting to acquire a lock with a timeout.
+ */
+ @Test
+ public void testTryLockFailWithTimeout() throws Throwable {
+ AtomixDistributedLock lock1 = newPrimitive("test-try-lock-fail-with-timeout");
+ AtomixDistributedLock lock2 = newPrimitive("test-try-lock-fail-with-timeout");
+
+ lock1.lock().join();
+
+ assertFalse(lock2.tryLock(Duration.ofSeconds(1)).join().isPresent());
+ }
+
+ /**
+ * Tests attempting to acquire a lock with a timeout.
+ */
+ @Test
+ public void testTryLockSucceedWithTimeout() throws Throwable {
+ AtomixDistributedLock lock1 = newPrimitive("test-try-lock-succeed-with-timeout");
+ AtomixDistributedLock lock2 = newPrimitive("test-try-lock-succeed-with-timeout");
+
+ lock1.lock().join();
+
+ CompletableFuture<Optional<Version>> future = lock2.tryLock(Duration.ofSeconds(1));
+ lock1.unlock().join();
+ assertTrue(future.join().isPresent());
+ }
+
+ /**
+ * Tests unlocking a lock with a blocking call in the event thread.
+ */
+ @Test
+ public void testBlockingUnlock() throws Throwable {
+ AtomixDistributedLock lock1 = newPrimitive("test-blocking-unlock");
+ AtomixDistributedLock lock2 = newPrimitive("test-blocking-unlock");
+
+ lock1.lock().thenRun(() -> {
+ lock1.unlock().join();
+ }).join();
+
+ lock2.lock().join();
+ }
+}
diff --git a/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java b/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
index 2f97c7d..4ba8a2c 100644
--- a/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
+++ b/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
@@ -25,6 +25,7 @@
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMultimapBuilder;
import org.onosproject.store.service.ConsistentTreeMapBuilder;
+import org.onosproject.store.service.DistributedLockBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.DocumentTreeBuilder;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
@@ -80,6 +81,11 @@
}
@Override
+ public DistributedLockBuilder lockBuilder() {
+ return null;
+ }
+
+ @Override
public LeaderElectorBuilder leaderElectorBuilder() {
return null;
}