blob: c5a425006e208f0382a1bc037dcd3a6af2c86cd4 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.utils.concurrent.Futures;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncDistributedLock;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.Version;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;
/**
* Atomix lock implementation.
*/
public class AtomixDistributedLock extends AbstractRaftPrimitive implements AsyncDistributedLock {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.register(AtomixDistributedLockOperations.NAMESPACE)
.register(AtomixDistributedLockEvents.NAMESPACE)
.build());
private final Map<Integer, CompletableFuture<Version>> futures = new ConcurrentHashMap<>();
private final AtomicInteger id = new AtomicInteger();
private int lock;
public AtomixDistributedLock(RaftProxy proxy) {
super(proxy);
proxy.addStateChangeListener(this::handleStateChange);
proxy.addEventListener(AtomixDistributedLockEvents.LOCK, SERIALIZER::decode, this::handleLocked);
proxy.addEventListener(AtomixDistributedLockEvents.FAIL, SERIALIZER::decode, this::handleFailed);
}
private void handleLocked(LockEvent event) {
CompletableFuture<Version> future = futures.remove(event.id());
if (future != null) {
this.lock = event.id();
future.complete(new Version(event.version()));
}
}
private void handleFailed(LockEvent event) {
CompletableFuture<Version> future = futures.remove(event.id());
if (future != null) {
future.complete(null);
}
}
private void handleStateChange(RaftProxy.State state) {
if (state != RaftProxy.State.CONNECTED) {
Iterator<Map.Entry<Integer, CompletableFuture<Version>>> iterator = futures.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, CompletableFuture<Version>> entry = iterator.next();
entry.getValue().completeExceptionally(new StorageException.Unavailable());
proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(entry.getKey()));
iterator.remove();
}
lock = 0;
}
}
@Override
public CompletableFuture<Version> lock() {
RaftProxy.State state = proxy.getState();
if (state != RaftProxy.State.CONNECTED) {
return Futures.exceptionalFuture(new StorageException.Unavailable());
}
CompletableFuture<Version> future = new CompletableFuture<>();
int id = this.id.incrementAndGet();
futures.put(id, future);
proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, -1)).whenComplete((result, error) -> {
if (error != null) {
futures.remove(id);
future.completeExceptionally(error);
}
});
return future;
}
@Override
public CompletableFuture<Optional<Version>> tryLock() {
RaftProxy.State state = proxy.getState();
if (state != RaftProxy.State.CONNECTED) {
return Futures.exceptionalFuture(new StorageException.Unavailable());
}
CompletableFuture<Version> future = new CompletableFuture<>();
int id = this.id.incrementAndGet();
futures.put(id, future);
proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, 0)).whenComplete((result, error) -> {
if (error != null) {
futures.remove(id);
future.completeExceptionally(error);
}
});
return future.thenApply(Optional::ofNullable);
}
@Override
public CompletableFuture<Optional<Version>> tryLock(Duration timeout) {
RaftProxy.State state = proxy.getState();
if (state != RaftProxy.State.CONNECTED) {
return Futures.exceptionalFuture(new StorageException.Unavailable());
}
CompletableFuture<Version> future = new CompletableFuture<>();
int id = this.id.incrementAndGet();
futures.put(id, future);
proxy.invoke(LOCK, SERIALIZER::encode, new Lock(id, timeout.toMillis())).whenComplete((result, error) -> {
if (error != null) {
futures.remove(id);
future.completeExceptionally(error);
}
});
return future.thenApply(Optional::ofNullable);
}
@Override
public CompletableFuture<Void> unlock() {
int lock = this.lock;
this.lock = 0;
if (lock != 0) {
return proxy.invoke(UNLOCK, SERIALIZER::encode, new Unlock(lock));
}
return CompletableFuture.completedFuture(null);
}
/**
* Closes the lock.
*
* @return a future to be completed once the lock has been closed
*/
public CompletableFuture<Void> close() {
return proxy.close();
}
}