Bugfixes for DistributedLockManager functionality
Added a method called broadcastIncludeSelf to ClusterCommunicationService.
Cosmetic improvements: added toString methods
Change-Id: I1d58720c29e6f8642f950670c3a6d95a7019a491
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 0021eb7..a6320ed 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -3,6 +3,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -37,7 +38,7 @@
public boolean createTable(String tableName, int ttlMillis) {
- CompletableFuture<Boolean> future = copycat.submit("createTable", tableName, ttlMillis);
+ CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
@@ -65,9 +66,9 @@
}
}
- public List<String> listTables() {
+ public Set<String> listTables() {
- CompletableFuture<List<String>> future = copycat.submit("listTables");
+ CompletableFuture<Set<String>> future = copycat.submit("listTables");
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
index 157d3a6..62cf584 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
@@ -33,7 +33,6 @@
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
@@ -51,9 +50,6 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- public static final MessageSubject DATABASE_UPDATES = new MessageSubject(
- "database-update-event");
-
private final DatabaseService databaseService;
private final ClusterCommunicationService clusterCommunicator;
@@ -61,9 +57,9 @@
private final ControllerNode localNode;
private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
- private final Map<String, Map<DatabaseRow, VersionedValue>> tableEntryExpirationMap = new HashMap<>();
+ private final Map<String, Map<DatabaseRow, Long>> tableEntryExpirationMap = new HashMap<>();
- private final ExpirationListener<DatabaseRow, VersionedValue> expirationObserver = new ExpirationObserver();
+ private final ExpirationListener<DatabaseRow, Long> expirationObserver = new ExpirationObserver();
DatabaseEntryExpirationTracker(
Member localMember,
@@ -78,31 +74,38 @@
@Override
public void tableModified(TableModificationEvent event) {
+ log.debug("Received a table modification event {}", event);
+
if (!tableEntryExpirationMap.containsKey(event.tableName())) {
return;
}
+ Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(event.tableName());
DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
- Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
- .get(event.tableName());
+ Long eventVersion = event.value().version();
switch (event.type()) {
case ROW_DELETED:
+ map.remove(row, eventVersion);
if (isLocalMemberLeader.get()) {
try {
+ // FIXME: The broadcast message should be sent to self.
clusterCommunicator.broadcast(new ClusterMessage(
- localNode.id(), DATABASE_UPDATES,
+ localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
DatabaseStateMachine.SERIALIZER.encode(event)));
} catch (IOException e) {
- log.error(
- "Failed to broadcast a database table modification event.",
- e);
+ log.error("Failed to broadcast a database row deleted event.", e);
}
}
break;
case ROW_ADDED:
case ROW_UPDATED:
- map.put(row, null);
+ // To account for potential reordering of notifications,
+ // check to make sure we are replacing an old version with a new version
+ Long currentVersion = map.get(row);
+ if (currentVersion == null || currentVersion < eventVersion) {
+ map.put(row, eventVersion);
+ }
break;
default:
break;
@@ -111,60 +114,56 @@
@Override
public void tableCreated(TableMetadata metadata) {
+ log.debug("Received a table created event {}", metadata);
if (metadata.expireOldEntries()) {
tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
- .expiration(metadata.ttlMillis(), TimeUnit.SECONDS)
+ .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
- // FIXME: make the expiration policy configurable.
+ // TODO: make the expiration policy configurable.
+ // Do we need to support expiration based on last access time?
.expirationPolicy(ExpirationPolicy.CREATED).build());
}
}
@Override
public void tableDeleted(String tableName) {
+ log.debug("Received a table deleted event for table ({})", tableName);
tableEntryExpirationMap.remove(tableName);
}
private class ExpirationObserver implements
- ExpirationListener<DatabaseRow, VersionedValue> {
+ ExpirationListener<DatabaseRow, Long> {
@Override
- public void expired(DatabaseRow key, VersionedValue value) {
+ public void expired(DatabaseRow row, Long version) {
+ Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(row.tableName);
try {
if (isLocalMemberLeader.get()) {
- if (!databaseService.removeIfVersionMatches(key.tableName,
- key.key, value.version())) {
- log.info("Entry in the database changed before right its TTL expiration.");
+ if (!databaseService.removeIfVersionMatches(row.tableName,
+ row.key, version)) {
+ log.info("Entry in database was updated right before its expiration.");
+ } else {
+ log.info("Successfully expired old entry with key ({}) from table ({})",
+ row.key, row.tableName);
}
} else {
- // If this node is not the current leader, we should never
- // let the expiring entries drop off
- // Under stable conditions (i.e no leadership switch) the
- // current leader will initiate
- // a database remove and this instance will get notified
- // of a tableModification event causing it to remove from
- // the map.
- Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
- .get(key.tableName);
+ // Only the current leader will expire keys from database.
+ // Everyone else function as standby just in case they need to take over
if (map != null) {
- map.put(key, value);
+ map.putIfAbsent(row, version);
}
}
} catch (Exception e) {
- log.warn(
- "Failed to delete entry from the database after ttl expiration. Will retry eviction",
- e);
- tableEntryExpirationMap.get(key.tableName).put(
- new DatabaseRow(key.tableName, key.key), value);
+ log.warn("Failed to delete entry from the database after ttl "
+ + "expiration. Operation will be retried.", e);
+ map.putIfAbsent(row, version);
}
}
}
@Override
public void handle(LeaderElectEvent event) {
- if (localMember.equals(event.leader())) {
- isLocalMemberLeader.set(true);
- }
+ isLocalMemberLeader.set(localMember.equals(event.leader()));
}
/**
@@ -212,12 +211,12 @@
continue;
}
- Map<DatabaseRow, VersionedValue> tableExpirationMap = ExpiringMap.builder()
+ Map<DatabaseRow, Long> tableExpirationMap = ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
.expirationPolicy(ExpirationPolicy.CREATED).build();
for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
- tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue());
+ tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue().version());
}
tableEntryExpirationMap.put(tableName, tableExpirationMap);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index e233e5a..ad1ef853 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -7,7 +7,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -19,6 +18,7 @@
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.log.Log;
import org.apache.felix.scr.annotations.Activate;
@@ -160,18 +160,22 @@
}
log.info("Starting cluster: {}", cluster);
+ DatabaseEntryExpirationTracker expirationTracker =
+ new DatabaseEntryExpirationTracker(
+ clusterConfig.getLocalMember(),
+ clusterService.getLocalNode(),
+ clusterCommunicator,
+ this);
DatabaseStateMachine stateMachine = new DatabaseStateMachine();
- stateMachine.addEventListener(
- new DatabaseEntryExpirationTracker(
- clusterConfig.getLocalMember(),
- clusterService.getLocalNode(),
- clusterCommunicator,
- this));
+ stateMachine.addEventListener(expirationTracker);
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
ClusterMessagingProtocol.SERIALIZER);
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
+
+ copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
+
copycat.start();
client = new DatabaseClient(copycat);
@@ -207,7 +211,7 @@
}
@Override
- public List<String> listTables() {
+ public Set<String> listTables() {
return client.listTables();
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 3b0d874..cdf66af 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -30,6 +30,7 @@
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -65,6 +66,7 @@
.register(WriteStatus.class)
// TODO: Move this out ?
.register(TableModificationEvent.class)
+ .register(TableModificationEvent.Type.class)
.register(ClusterMessagingProtocol.COMMON)
.build()
.populate(1);
@@ -85,7 +87,8 @@
}
@Command
- public boolean createTable(String tableName, int ttlMillis) {
+ public boolean createTableWithExpiration(String tableName) {
+ int ttlMillis = 10000;
TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
return createTable(metadata);
}
@@ -266,6 +269,7 @@
// notify listeners of table mod events.
for (DatabaseUpdateEventListener listener : listeners) {
for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
+ log.info("Publishing table modification event: {}", tableModificationEvent);
listener.tableModified(tableModificationEvent);
}
}
@@ -345,6 +349,15 @@
public int ttlMillis() {
return ttlMillis;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("tableName", tableName)
+ .add("expireOldEntries", expireOldEntries)
+ .add("ttlMillis", ttlMillis)
+ .toString();
+ }
}
@Override
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index c466941..cef53a5 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -80,6 +80,7 @@
return false;
}
}
+ isLocked.set(true);
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
return true;
}
@@ -95,9 +96,11 @@
if (DateTime.now().isAfter(lockExpirationTime)) {
isLocked.set(false);
return false;
+ } else {
+ return true;
}
}
- return true;
+ return false;
}
@Override
@@ -105,6 +108,7 @@
if (!isLocked()) {
return;
} else {
+ isLocked.set(false);
databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
index 6d99ba7..c8b16e4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -94,6 +94,8 @@
return;
}
+ log.info("Received a lock available event for path: {}", event.key());
+
String path = event.key();
if (!locksToAcquire.containsKey(path)) {
return;
@@ -159,4 +161,4 @@
return future;
}
}
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
index b0962dc..31dcaab 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
@@ -2,6 +2,8 @@
import org.onlab.onos.store.service.VersionedValue;
+import com.google.common.base.MoreObjects;
+
/**
* A table modification event.
*/
@@ -9,7 +11,6 @@
/**
* Type of table modification event.
- *
*/
public enum Type {
ROW_ADDED,
@@ -94,4 +95,14 @@
public Type type() {
return type;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("type", type)
+ .add("tableName", tableName)
+ .add("key", key)
+ .add("version", value.version())
+ .toString();
+ }
}