1. Fixed a synchronization issue with database update processing and expiry tracking.
2. Fixed a synchronization issue with MapDBLog appendEntries method.
3. DatabaseClient now uses ProtocolClient to interact with Raft cluster.
4. Misc javdoc and logging improvements
Change-Id: I147eb5bf859cf9827df452d62ab415d643a00aa4
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
index c8b16e4..a8941aa 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -1,10 +1,13 @@
package org.onlab.onos.store.service.impl;
+import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -23,18 +26,23 @@
import org.onlab.onos.store.service.LockService;
import org.slf4j.Logger;
-import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimaps;
@Component(immediate = true)
@Service
public class DistributedLockManager implements LockService {
+ private static final ExecutorService THREAD_POOL =
+ Executors.newCachedThreadPool(namedThreads("lock-manager-%d"));
+
private final Logger log = getLogger(getClass());
public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
- private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap
- .create();
+ private final ListMultimap<String, LockRequest> locksToAcquire =
+ Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicator;
@@ -56,8 +64,9 @@
@Deactivate
public void deactivate() {
+ clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
locksToAcquire.clear();
- log.info("Started.");
+ log.info("Stopped.");
}
@Override
@@ -77,11 +86,19 @@
throw new UnsupportedOperationException();
}
- protected CompletableFuture<Void> lockIfAvailable(Lock lock,
- long waitTimeMillis, int leaseDurationMillis) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis,
- leaseDurationMillis, future));
+ /**
+ * Attempts to acquire the lock as soon as it becomes available.
+ * @param lock lock to acquire.
+ * @param waitTimeMillis maximum time to wait before giving up.
+ * @param leaseDurationMillis the duration for which to acquire the lock initially.
+ * @return Future lease expiration date.
+ */
+ protected CompletableFuture<DateTime> lockIfAvailable(
+ Lock lock,
+ long waitTimeMillis,
+ int leaseDurationMillis) {
+ CompletableFuture<DateTime> future = new CompletableFuture<>();
+ locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
return future;
}
@@ -90,37 +107,46 @@
public void handle(ClusterMessage message) {
TableModificationEvent event = DatabaseStateMachine.SERIALIZER
.decode(message.payload());
- if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
- return;
+ if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
+ event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
+ THREAD_POOL.submit(new RetryLockTask(event.key()));
}
+ }
+ }
- log.info("Received a lock available event for path: {}", event.key());
+ private class RetryLockTask implements Runnable {
- String path = event.key();
+ private final String path;
+
+ public RetryLockTask(String path) {
+ this.path = path;
+ }
+
+ @Override
+ public void run() {
if (!locksToAcquire.containsKey(path)) {
return;
}
- if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
- List<LockRequest> existingRequests = locksToAcquire.get(path);
- if (existingRequests == null) {
- return;
- }
+ List<LockRequest> existingRequests = locksToAcquire.get(path);
+ if (existingRequests == null || existingRequests.isEmpty()) {
+ return;
+ }
+ log.info("Path {} is now available for locking. There are {} outstanding "
+ + "requests for it.",
+ path, existingRequests.size());
- synchronized (existingRequests) {
-
- Iterator<LockRequest> existingRequestIterator = existingRequests
- .iterator();
- while (existingRequestIterator.hasNext()) {
- LockRequest request = existingRequestIterator.next();
- if (request.expirationTime().isAfter(DateTime.now())) {
+ synchronized (existingRequests) {
+ Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
+ while (existingRequestIterator.hasNext()) {
+ LockRequest request = existingRequestIterator.next();
+ if (DateTime.now().isAfter(request.requestExpirationTime())) {
+ // request expired.
+ existingRequestIterator.remove();
+ } else {
+ if (request.lock().tryLock(request.leaseDurationMillis())) {
+ request.future().complete(DateTime.now().plusMillis(request.leaseDurationMillis()));
existingRequestIterator.remove();
- } else {
- if (request.lock().tryLock(
- request.leaseDurationMillis())) {
- request.future().complete(null);
- existingRequestIterator.remove();
- }
}
}
}
@@ -131,16 +157,15 @@
private class LockRequest {
private final Lock lock;
- private final DateTime expirationTime;
+ private final DateTime requestExpirationTime;
private final int leaseDurationMillis;
- private final CompletableFuture<Void> future;
+ private final CompletableFuture<DateTime> future;
public LockRequest(Lock lock, long waitTimeMillis,
- int leaseDurationMillis, CompletableFuture<Void> future) {
+ int leaseDurationMillis, CompletableFuture<DateTime> future) {
this.lock = lock;
- this.expirationTime = DateTime.now().plusMillis(
- (int) waitTimeMillis);
+ this.requestExpirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
this.leaseDurationMillis = leaseDurationMillis;
this.future = future;
}
@@ -149,15 +174,15 @@
return lock;
}
- public DateTime expirationTime() {
- return expirationTime;
+ public DateTime requestExpirationTime() {
+ return requestExpirationTime;
}
public int leaseDurationMillis() {
return leaseDurationMillis;
}
- public CompletableFuture<Void> future() {
+ public CompletableFuture<DateTime> future() {
return future;
}
}