blob: 629c50ca5336c0ad307db2e7ac0e1de7606dc04d [file] [log] [blame]
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Verify.verify;
import static org.slf4j.LoggerFactory.getLogger;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.joda.time.DateTime;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.Lock;
import org.onlab.onos.store.service.VersionedValue;
import org.slf4j.Logger;
/**
* A distributed lock implementation.
*/
public class DistributedLock implements Lock {
private final Logger log = getLogger(getClass());
private final DistributedLockManager lockManager;
private final DatabaseService databaseService;
private final String path;
private DateTime lockExpirationTime;
private AtomicBoolean isLocked = new AtomicBoolean(false);
private volatile long epoch = 0;
private byte[] lockId;
public DistributedLock(
String path,
DatabaseService databaseService,
ClusterService clusterService,
DistributedLockManager lockManager) {
this.path = path;
this.databaseService = databaseService;
this.lockManager = lockManager;
this.lockId =
(UUID.randomUUID().toString() + "::" +
clusterService.getLocalNode().id().toString()).
getBytes(StandardCharsets.UTF_8);
}
@Override
public String path() {
return path;
}
@Override
public void lock(int leaseDurationMillis) throws InterruptedException {
try {
lockAsync(leaseDurationMillis).get();
} catch (ExecutionException e) {
throw new DatabaseException(e);
}
}
@Override
public CompletableFuture<Void> lockAsync(int leaseDurationMillis) {
try {
if (isLocked() || tryLock(leaseDurationMillis)) {
return CompletableFuture.<Void>completedFuture(null);
}
return lockManager.lockIfAvailable(this, leaseDurationMillis);
} catch (DatabaseException e) {
CompletableFuture<Void> lockFuture = new CompletableFuture<>();
lockFuture.completeExceptionally(e);
return lockFuture;
}
}
@Override
public boolean tryLock(int leaseDurationMillis) {
if (databaseService.putIfAbsent(
DistributedLockManager.ONOS_LOCK_TABLE_NAME,
path,
lockId)) {
VersionedValue vv =
databaseService.get(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path);
verify(Arrays.equals(vv.value(), lockId));
epoch = vv.version();
isLocked.set(true);
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
return true;
}
return false;
}
@Override
public boolean tryLock(
int waitTimeMillis,
int leaseDurationMillis) throws InterruptedException {
if (isLocked() || tryLock(leaseDurationMillis)) {
return true;
}
CompletableFuture<Void> future =
lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
try {
future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
return true;
} catch (ExecutionException e) {
throw new DatabaseException(e);
} catch (TimeoutException e) {
log.debug("Timed out waiting to acquire lock for {}", path);
return false;
}
}
@Override
public boolean isLocked() {
if (isLocked.get()) {
// We rely on local information to check
// if the lock expired.
// This should should make this call
// light weight, while still retaining the
// safety guarantees.
if (DateTime.now().isAfter(lockExpirationTime)) {
isLocked.set(false);
return false;
} else {
return true;
}
}
return false;
}
@Override
public long epoch() {
return epoch;
}
@Override
public void unlock() {
if (!isLocked()) {
return;
} else {
if (databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId)) {
isLocked.set(false);
}
}
}
@Override
public boolean extendExpiration(int leaseDurationMillis) {
if (!isLocked()) {
log.warn("Ignoring request to extend expiration for lock {}."
+ " ExtendExpiration must be called for locks that are already acquired.", path);
return false;
}
if (databaseService.putIfValueMatches(
DistributedLockManager.ONOS_LOCK_TABLE_NAME,
path,
lockId,
lockId)) {
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
log.debug("Succeeded in extending lock {} expiration time to {}", lockExpirationTime);
return true;
} else {
log.info("Failed to extend expiration for {}", path);
return false;
}
}
}