1. Fixed a synchronization issue with database update processing and expiry tracking.
2. Fixed a synchronization issue with MapDBLog appendEntries method.
3. DatabaseClient now uses ProtocolClient to interact with Raft cluster.
4. Misc javdoc and logging improvements
Change-Id: I147eb5bf859cf9827df452d62ab415d643a00aa4
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseException.java b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseException.java
index bbc2daf..1468d1b 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseException.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseException.java
@@ -19,4 +19,18 @@
public DatabaseException() {
};
+
+ public static class Timeout extends DatabaseException {
+ public Timeout(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public Timeout(String message) {
+ super(message);
+ }
+
+ public Timeout(Throwable t) {
+ super(t);
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 72e4b95..876f6cc 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -1,21 +1,24 @@
package org.onlab.onos.store.service.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import net.kuujo.copycat.Copycat;
+import net.kuujo.copycat.cluster.Member;
+import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
@@ -28,7 +31,7 @@
/**
* Client for interacting with the Copycat Raft cluster.
*/
-public class DatabaseClient {
+public class DatabaseClient implements EventHandler<LeaderElectEvent> {
private static final int RETRIES = 5;
@@ -36,137 +39,101 @@
private final Logger log = getLogger(getClass());
- private final Copycat copycat;
+ private final DatabaseProtocolService protocol;
+ private volatile ProtocolClient copycat = null;
+ private volatile Member currentLeader = null;
- public DatabaseClient(Copycat copycat) {
- this.copycat = checkNotNull(copycat);
+ public DatabaseClient(DatabaseProtocolService protocol) {
+ this.protocol = protocol;
+ }
+
+ @Override
+ public void handle(LeaderElectEvent event) {
+ Member newLeader = event.leader();
+ if (newLeader != null && !newLeader.equals(currentLeader)) {
+ currentLeader = newLeader;
+ if (copycat != null) {
+ copycat.close();
+ }
+ copycat = protocol.createClient((TcpMember) currentLeader);
+ copycat.connect();
+ }
+ }
+
+ private String nextRequestId() {
+ return UUID.randomUUID().toString();
}
public void waitForLeader() {
- if (copycat.leader() != null) {
+ if (currentLeader != null) {
return;
}
log.info("No leader in cluster, waiting for election.");
- final CountDownLatch latch = new CountDownLatch(1);
- final EventHandler<LeaderElectEvent> leaderLsnr = new EventHandler<LeaderElectEvent>() {
- @Override
- public void handle(LeaderElectEvent event) {
- log.info("Leader chosen: {}", event);
- latch.countDown();
- }
- };
-
- copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr);
try {
- while (copycat.leader() == null) {
- latch.await(200, TimeUnit.MILLISECONDS);
+ while (currentLeader == null) {
+ Thread.sleep(200);
}
- log.info("Leader appeared: {}", copycat.leader());
+ log.info("Leader appeared: {}", currentLeader);
return;
} catch (InterruptedException e) {
log.error("Interrupted while waiting for Leader", e);
Thread.currentThread().interrupt();
- } finally {
- copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr);
+ }
+ }
+
+ private <T> T submit(String operationName, Object... args) {
+ waitForLeader();
+ if (currentLeader == null) {
+ throw new DatabaseException("Raft cluster does not have a leader.");
+ }
+
+ SubmitRequest request =
+ new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
+
+ CompletableFuture<SubmitResponse> submitResponse = copycat.submit(request);
+
+ log.debug("Sent {} to {}", request, currentLeader);
+
+ try {
+ return (T) submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS).result();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new DatabaseException(e);
+ } catch (TimeoutException e) {
+ throw new DatabaseException.Timeout(e);
}
}
public boolean createTable(String tableName) {
- waitForLeader();
- CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- }
+ return submit("createTable", tableName);
}
public boolean createTable(String tableName, int ttlMillis) {
- waitForLeader();
- CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- }
+ return submit("createTable", tableName, ttlMillis);
}
public void dropTable(String tableName) {
- waitForLeader();
- CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
- try {
- future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- }
+ submit("dropTable", tableName);
}
public void dropAllTables() {
- waitForLeader();
- CompletableFuture<Void> future = copycat.submit("dropAllTables");
- try {
- future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- }
+ submit("dropAllTables");
}
public Set<String> listTables() {
- waitForLeader();
- try {
- for (int i = 0; i < RETRIES; ++i) {
- CompletableFuture<Set<String>> future = copycat.submit("listTables");
- try {
- return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- log.debug("Timed out retrying {}", i);
- future.cancel(true);
- waitForLeader();
- }
- }
- // TODO: proper timeout handling
- log.error("Timed out");
- return Collections.emptySet();
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- }
+ return submit("listTables");
}
public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
- waitForLeader();
- CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
- try {
- return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- } catch (TimeoutException e) {
- throw new DatabaseException(e);
- }
+ return submit("read", batchRequest);
}
public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
- waitForLeader();
- CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
- try {
- return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- } catch (TimeoutException e) {
- throw new DatabaseException(e);
- }
+ return submit("write", batchRequest);
}
public Map<String, VersionedValue> getAll(String tableName) {
- waitForLeader();
- CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
- try {
- return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- } catch (TimeoutException e) {
- throw new DatabaseException(e);
- }
+ return submit("getAll", tableName);
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
index 62cf584..2fba52e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
@@ -16,10 +16,14 @@
package org.onlab.onos.store.service.impl;
+import static org.onlab.util.Tools.namedThreads;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,6 +44,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.MoreObjects;
+
/**
* Plugs into the database update stream and track the TTL of entries added to
* the database. For tables with pre-configured finite TTL, this class has
@@ -48,6 +54,9 @@
public class DatabaseEntryExpirationTracker implements
DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
+ private static final ExecutorService THREAD_POOL =
+ Executors.newCachedThreadPool(namedThreads("database-stale-entry-expirer-%d"));
+
private final Logger log = LoggerFactory.getLogger(getClass());
private final DatabaseService databaseService;
@@ -74,7 +83,7 @@
@Override
public void tableModified(TableModificationEvent event) {
- log.debug("Received a table modification event {}", event);
+ log.debug("{}: Received {}", localNode.id(), event);
if (!tableEntryExpirationMap.containsKey(event.tableName())) {
return;
@@ -89,8 +98,8 @@
map.remove(row, eventVersion);
if (isLocalMemberLeader.get()) {
try {
- // FIXME: The broadcast message should be sent to self.
- clusterCommunicator.broadcast(new ClusterMessage(
+ log.debug("Broadcasting {} to the entire cluster", event);
+ clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
DatabaseStateMachine.SERIALIZER.encode(event)));
} catch (IOException e) {
@@ -119,8 +128,6 @@
tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
- // TODO: make the expiration policy configurable.
- // Do we need to support expiration based on last access time?
.expirationPolicy(ExpirationPolicy.CREATED).build());
}
}
@@ -135,6 +142,23 @@
ExpirationListener<DatabaseRow, Long> {
@Override
public void expired(DatabaseRow row, Long version) {
+ THREAD_POOL.submit(new ExpirationTask(row, version));
+ }
+ }
+
+ private class ExpirationTask implements Runnable {
+
+ private final DatabaseRow row;
+ private final Long version;
+
+ public ExpirationTask(DatabaseRow row, Long version) {
+ this.row = row;
+ this.version = version;
+ }
+
+ @Override
+ public void run() {
+ log.debug("Received an expiration event for {}, version: {}", row, version);
Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(row.tableName);
try {
if (isLocalMemberLeader.get()) {
@@ -142,7 +166,7 @@
row.key, version)) {
log.info("Entry in database was updated right before its expiration.");
} else {
- log.info("Successfully expired old entry with key ({}) from table ({})",
+ log.debug("Successfully expired old entry with key ({}) from table ({})",
row.key, row.tableName);
}
} else {
@@ -164,6 +188,9 @@
@Override
public void handle(LeaderElectEvent event) {
isLocalMemberLeader.set(localMember.equals(event.leader()));
+ if (isLocalMemberLeader.get()) {
+ log.info("{} is now the leader of Raft cluster", localNode.id());
+ }
}
/**
@@ -180,6 +207,14 @@
}
@Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("tableName", tableName)
+ .add("key", key)
+ .toString();
+ }
+
+ @Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
@@ -204,6 +239,7 @@
if (!tableEntryExpirationMap.isEmpty()) {
return;
}
+ log.debug("Received a snapshot installed notification");
for (String tableName : state.getTableNames()) {
TableMetadata metadata = state.getTableMetadata(tableName);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 645de00..65b0544 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -173,13 +173,16 @@
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
ClusterMessagingProtocol.SERIALIZER);
+ client = new DatabaseClient(copycatMessagingProtocol);
+
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
+ copycat.event(LeaderElectEvent.class).registerHandler(client);
copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
copycat.start().get();
- client = new DatabaseClient(copycat);
+ client = new DatabaseClient(copycatMessagingProtocol);
client.waitForLeader();
log.info("Started.");
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 5958bb7..f6ea217 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -1,5 +1,6 @@
package org.onlab.onos.store.service.impl;
+import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.ByteArrayInputStream;
@@ -9,6 +10,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
@@ -49,6 +52,9 @@
private final Logger log = getLogger(getClass());
+ private final ExecutorService updatesExecutor =
+ Executors.newSingleThreadExecutor(namedThreads("database-statemachine-updates"));
+
// message subject for database update notifications.
public static final MessageSubject DATABASE_UPDATE_EVENTS =
new MessageSubject("database-update-events");
@@ -88,8 +94,7 @@
}
@Command
- public boolean createTableWithExpiration(String tableName) {
- int ttlMillis = 10000;
+ public boolean createTable(String tableName, Integer ttlMillis) {
TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
return createTable(metadata);
}
@@ -100,18 +105,32 @@
return false;
}
state.createTable(metadata);
- for (DatabaseUpdateEventListener listener : listeners) {
- listener.tableCreated(metadata);
- }
+
+ updatesExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.tableCreated(metadata);
+ }
+ }
+ });
+
return true;
}
@Command
public boolean dropTable(String tableName) {
if (state.removeTable(tableName)) {
- for (DatabaseUpdateEventListener listener : listeners) {
- listener.tableDeleted(tableName);
- }
+
+ updatesExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.tableDeleted(tableName);
+ }
+ }
+ });
+
return true;
}
return false;
@@ -121,11 +140,18 @@
public boolean dropAllTables() {
Set<String> tableNames = state.getTableNames();
state.removeAllTables();
- for (DatabaseUpdateEventListener listener : listeners) {
- for (String tableName : tableNames) {
- listener.tableDeleted(tableName);
+
+ updatesExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ for (String tableName : tableNames) {
+ listener.tableDeleted(tableName);
+ }
+ }
}
- }
+ });
+
return true;
}
@@ -273,12 +299,18 @@
}
// notify listeners of table mod events.
- for (DatabaseUpdateEventListener listener : listeners) {
- for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
- log.trace("Publishing table modification event: {}", tableModificationEvent);
- listener.tableModified(tableModificationEvent);
+
+ updatesExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
+ log.trace("Publishing table modification event: {}", tableModificationEvent);
+ listener.tableModified(tableModificationEvent);
+ }
+ }
}
- }
+ });
return results;
}
@@ -397,10 +429,15 @@
this.state = SERIALIZER.decode(data);
}
- // FIXME: synchronize.
- for (DatabaseUpdateEventListener listener : listeners) {
- listener.snapshotInstalled(state);
- }
+ updatesExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.snapshotInstalled(state);
+ }
+ }
+ });
+
} catch (Exception e) {
log.error("Failed to install from snapshot", e);
throw new SnapshotException(e);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index cef53a5..a3fbc0d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -1,5 +1,7 @@
package org.onlab.onos.store.service.impl;
+import static org.slf4j.LoggerFactory.getLogger;
+
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -11,12 +13,17 @@
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.Lock;
+import org.slf4j.Logger;
/**
* A distributed lock implementation.
*/
public class DistributedLock implements Lock {
+ private final Logger log = getLogger(getClass());
+
+ private static final long MAX_WAIT_TIME_MS = 100000000L;
+
private final DistributedLockManager lockManager;
private final DatabaseService databaseService;
private final String path;
@@ -44,54 +51,60 @@
@Override
public void lock(int leaseDurationMillis) {
-
if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
// Nothing to do.
// Current expiration time is beyond what is requested.
return;
} else {
- tryLock(Long.MAX_VALUE, leaseDurationMillis);
+ tryLock(MAX_WAIT_TIME_MS, leaseDurationMillis);
}
}
@Override
public boolean tryLock(int leaseDurationMillis) {
- return databaseService.putIfAbsent(
+ if (databaseService.putIfAbsent(
DistributedLockManager.ONOS_LOCK_TABLE_NAME,
path,
- lockId);
+ lockId)) {
+ isLocked.set(true);
+ lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
+ return true;
+ }
+ return false;
}
@Override
public boolean tryLock(
long waitTimeMillis,
int leaseDurationMillis) {
- if (!tryLock(leaseDurationMillis)) {
- CompletableFuture<Void> future =
- lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
- try {
- future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
- } catch (ExecutionException | InterruptedException e) {
- // TODO: ExecutionException could indicate something
- // wrong with the backing database.
- // Throw an exception?
- return false;
- } catch (TimeoutException e) {
- return false;
- }
+ if (tryLock(leaseDurationMillis)) {
+ return true;
}
- isLocked.set(true);
- lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
- return true;
+
+ CompletableFuture<DateTime> future =
+ lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
+ try {
+ lockExpirationTime = future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
+ return true;
+ } catch (ExecutionException | InterruptedException e) {
+ log.error("Encountered an exception trying to acquire lock for " + path, e);
+ // TODO: ExecutionException could indicate something
+ // wrong with the backing database.
+ // Throw an exception?
+ return false;
+ } catch (TimeoutException e) {
+ log.debug("Timed out waiting to acquire lock for {}", path);
+ return false;
+ }
}
@Override
public boolean isLocked() {
if (isLocked.get()) {
// We rely on local information to check
- // if the expired.
+ // if the lock expired.
// This should should make this call
- // light weight, which still retaining the same
+ // light weight, while still retaining the
// safety guarantees.
if (DateTime.now().isAfter(lockExpirationTime)) {
isLocked.set(false);
@@ -108,17 +121,30 @@
if (!isLocked()) {
return;
} else {
- isLocked.set(false);
- databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
+ if (databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId)) {
+ isLocked.set(false);
+ }
}
}
@Override
public boolean extendExpiration(int leaseDurationMillis) {
- if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
+ if (!isLocked()) {
+ log.warn("Ignoring request to extend expiration for lock {}."
+ + " ExtendExpiration must be called for locks that are already acquired.", path);
+ }
+
+ if (databaseService.putIfValueMatches(
+ DistributedLockManager.ONOS_LOCK_TABLE_NAME,
+ path,
+ lockId,
+ lockId)) {
+ lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
+ log.debug("Succeeded in extending lock {} expiration time to {}", lockExpirationTime);
return true;
} else {
- return tryLock(leaseDurationMillis);
+ log.info("Failed to extend expiration for {}", path);
+ return false;
}
}
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
index c8b16e4..a8941aa 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -1,10 +1,13 @@
package org.onlab.onos.store.service.impl;
+import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -23,18 +26,23 @@
import org.onlab.onos.store.service.LockService;
import org.slf4j.Logger;
-import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimaps;
@Component(immediate = true)
@Service
public class DistributedLockManager implements LockService {
+ private static final ExecutorService THREAD_POOL =
+ Executors.newCachedThreadPool(namedThreads("lock-manager-%d"));
+
private final Logger log = getLogger(getClass());
public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
- private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap
- .create();
+ private final ListMultimap<String, LockRequest> locksToAcquire =
+ Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicator;
@@ -56,8 +64,9 @@
@Deactivate
public void deactivate() {
+ clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
locksToAcquire.clear();
- log.info("Started.");
+ log.info("Stopped.");
}
@Override
@@ -77,11 +86,19 @@
throw new UnsupportedOperationException();
}
- protected CompletableFuture<Void> lockIfAvailable(Lock lock,
- long waitTimeMillis, int leaseDurationMillis) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis,
- leaseDurationMillis, future));
+ /**
+ * Attempts to acquire the lock as soon as it becomes available.
+ * @param lock lock to acquire.
+ * @param waitTimeMillis maximum time to wait before giving up.
+ * @param leaseDurationMillis the duration for which to acquire the lock initially.
+ * @return Future lease expiration date.
+ */
+ protected CompletableFuture<DateTime> lockIfAvailable(
+ Lock lock,
+ long waitTimeMillis,
+ int leaseDurationMillis) {
+ CompletableFuture<DateTime> future = new CompletableFuture<>();
+ locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
return future;
}
@@ -90,37 +107,46 @@
public void handle(ClusterMessage message) {
TableModificationEvent event = DatabaseStateMachine.SERIALIZER
.decode(message.payload());
- if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
- return;
+ if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
+ event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
+ THREAD_POOL.submit(new RetryLockTask(event.key()));
}
+ }
+ }
- log.info("Received a lock available event for path: {}", event.key());
+ private class RetryLockTask implements Runnable {
- String path = event.key();
+ private final String path;
+
+ public RetryLockTask(String path) {
+ this.path = path;
+ }
+
+ @Override
+ public void run() {
if (!locksToAcquire.containsKey(path)) {
return;
}
- if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
- List<LockRequest> existingRequests = locksToAcquire.get(path);
- if (existingRequests == null) {
- return;
- }
+ List<LockRequest> existingRequests = locksToAcquire.get(path);
+ if (existingRequests == null || existingRequests.isEmpty()) {
+ return;
+ }
+ log.info("Path {} is now available for locking. There are {} outstanding "
+ + "requests for it.",
+ path, existingRequests.size());
- synchronized (existingRequests) {
-
- Iterator<LockRequest> existingRequestIterator = existingRequests
- .iterator();
- while (existingRequestIterator.hasNext()) {
- LockRequest request = existingRequestIterator.next();
- if (request.expirationTime().isAfter(DateTime.now())) {
+ synchronized (existingRequests) {
+ Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
+ while (existingRequestIterator.hasNext()) {
+ LockRequest request = existingRequestIterator.next();
+ if (DateTime.now().isAfter(request.requestExpirationTime())) {
+ // request expired.
+ existingRequestIterator.remove();
+ } else {
+ if (request.lock().tryLock(request.leaseDurationMillis())) {
+ request.future().complete(DateTime.now().plusMillis(request.leaseDurationMillis()));
existingRequestIterator.remove();
- } else {
- if (request.lock().tryLock(
- request.leaseDurationMillis())) {
- request.future().complete(null);
- existingRequestIterator.remove();
- }
}
}
}
@@ -131,16 +157,15 @@
private class LockRequest {
private final Lock lock;
- private final DateTime expirationTime;
+ private final DateTime requestExpirationTime;
private final int leaseDurationMillis;
- private final CompletableFuture<Void> future;
+ private final CompletableFuture<DateTime> future;
public LockRequest(Lock lock, long waitTimeMillis,
- int leaseDurationMillis, CompletableFuture<Void> future) {
+ int leaseDurationMillis, CompletableFuture<DateTime> future) {
this.lock = lock;
- this.expirationTime = DateTime.now().plusMillis(
- (int) waitTimeMillis);
+ this.requestExpirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
this.leaseDurationMillis = leaseDurationMillis;
this.future = future;
}
@@ -149,15 +174,15 @@
return lock;
}
- public DateTime expirationTime() {
- return expirationTime;
+ public DateTime requestExpirationTime() {
+ return requestExpirationTime;
}
public int leaseDurationMillis() {
return leaseDurationMillis;
}
- public CompletableFuture<Void> future() {
+ public CompletableFuture<DateTime> future() {
return future;
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
index aa0dd7c..9ca5494 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
@@ -86,7 +86,7 @@
}
@Override
- public List<Long> appendEntries(List<Entry> entries) {
+ public synchronized List<Long> appendEntries(List<Entry> entries) {
assertIsOpen();
checkArgument(entries != null, "expecting non-null entries");
final List<Long> indices = new ArrayList<>(entries.size());