Using net.jodah.expiringmap.ExpiringMap for tracking ttl expiration of database entries.
Minor javadoc updates.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
index f83b042..6d99ba7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -33,7 +33,8 @@
public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
- private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap.create();
+ private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap
+ .create();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicator;
@@ -61,11 +62,7 @@
@Override
public Lock create(String path) {
- return new DistributedLock(
- path,
- databaseService,
- clusterService,
- this);
+ return new DistributedLock(path, databaseService, clusterService, this);
}
@Override
@@ -80,21 +77,19 @@
throw new UnsupportedOperationException();
}
- protected CompletableFuture<Void> lockIfAvailable(
- Lock lock,
- long waitTimeMillis,
- int leaseDurationMillis) {
+ protected CompletableFuture<Void> lockIfAvailable(Lock lock,
+ long waitTimeMillis, int leaseDurationMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
- locksToAcquire.put(
- lock.path(),
- new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
+ locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis,
+ leaseDurationMillis, future));
return future;
}
private class LockEventMessageListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
- TableModificationEvent event = DatabaseStateMachine.SERIALIZER.decode(message.payload());
+ TableModificationEvent event = DatabaseStateMachine.SERIALIZER
+ .decode(message.payload());
if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
return;
}
@@ -110,15 +105,20 @@
return;
}
- Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
- while (existingRequestIterator.hasNext()) {
- LockRequest request = existingRequestIterator.next();
- if (request.expirationTime().isAfter(DateTime.now())) {
- existingRequestIterator.remove();
- } else {
- if (request.lock().tryLock(request.leaseDurationMillis())) {
- request.future().complete(null);
- existingRequests.remove(0);
+ synchronized (existingRequests) {
+
+ Iterator<LockRequest> existingRequestIterator = existingRequests
+ .iterator();
+ while (existingRequestIterator.hasNext()) {
+ LockRequest request = existingRequestIterator.next();
+ if (request.expirationTime().isAfter(DateTime.now())) {
+ existingRequestIterator.remove();
+ } else {
+ if (request.lock().tryLock(
+ request.leaseDurationMillis())) {
+ request.future().complete(null);
+ existingRequestIterator.remove();
+ }
}
}
}
@@ -133,14 +133,12 @@
private final int leaseDurationMillis;
private final CompletableFuture<Void> future;
- public LockRequest(
- Lock lock,
- long waitTimeMillis,
- int leaseDurationMillis,
- CompletableFuture<Void> future) {
+ public LockRequest(Lock lock, long waitTimeMillis,
+ int leaseDurationMillis, CompletableFuture<Void> future) {
this.lock = lock;
- this.expirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
+ this.expirationTime = DateTime.now().plusMillis(
+ (int) waitTimeMillis);
this.leaseDurationMillis = leaseDurationMillis;
this.future = future;
}