Merge "Support for expiring Database entries"
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
index 40d12ff..fa73546 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
@@ -20,6 +20,16 @@
public boolean createTable(String name);
/**
+ * Creates a new table where last update time will be used to track and expire old entries.
+ * Table creation is idempotent. Attempting to create a table
+ * that already exists will be a noop.
+ * @param name table name.
+ * @param ttlMillis total duration in millis since last update time when entries will be expired.
+ * @return true if the table was created by this call, false otherwise.
+ */
+ public boolean createTable(String name, int ttlMillis);
+
+ /**
* Lists all the tables in the database.
* @return list of table names.
*/
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 77ff062..0021eb7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -35,6 +35,16 @@
}
}
+ public boolean createTable(String tableName, int ttlMillis) {
+
+ CompletableFuture<Boolean> future = copycat.submit("createTable", tableName, ttlMillis);
+ try {
+ return future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
public void dropTable(String tableName) {
CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
index c9775ca..157d3a6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
@@ -30,12 +30,14 @@
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
-import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.VersionedValue;
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,23 +54,34 @@
public static final MessageSubject DATABASE_UPDATES = new MessageSubject(
"database-update-event");
- private DatabaseService databaseService;
- private ClusterService cluster;
- private ClusterCommunicationService clusterCommunicator;
+ private final DatabaseService databaseService;
+ private final ClusterCommunicationService clusterCommunicator;
private final Member localMember;
+ private final ControllerNode localNode;
private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
private final Map<String, Map<DatabaseRow, VersionedValue>> tableEntryExpirationMap = new HashMap<>();
private final ExpirationListener<DatabaseRow, VersionedValue> expirationObserver = new ExpirationObserver();
- DatabaseEntryExpirationTracker(Member localMember) {
+ DatabaseEntryExpirationTracker(
+ Member localMember,
+ ControllerNode localNode,
+ ClusterCommunicationService clusterCommunicator,
+ DatabaseService databaseService) {
this.localMember = localMember;
+ this.localNode = localNode;
+ this.clusterCommunicator = clusterCommunicator;
+ this.databaseService = databaseService;
}
@Override
public void tableModified(TableModificationEvent event) {
+ if (!tableEntryExpirationMap.containsKey(event.tableName())) {
+ return;
+ }
+
DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
.get(event.tableName());
@@ -77,8 +90,8 @@
case ROW_DELETED:
if (isLocalMemberLeader.get()) {
try {
- clusterCommunicator.broadcast(new ClusterMessage(cluster
- .getLocalNode().id(), DATABASE_UPDATES,
+ clusterCommunicator.broadcast(new ClusterMessage(
+ localNode.id(), DATABASE_UPDATES,
DatabaseStateMachine.SERIALIZER.encode(event)));
} catch (IOException e) {
log.error(
@@ -97,12 +110,10 @@
}
@Override
- public void tableCreated(String tableName, int expirationTimeMillis) {
- // make this explicit instead of relying on a negative value
- // to indicate no expiration.
- if (expirationTimeMillis > 0) {
- tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
- .expiration(expirationTimeMillis, TimeUnit.SECONDS)
+ public void tableCreated(TableMetadata metadata) {
+ if (metadata.expireOldEntries()) {
+ tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
+ .expiration(metadata.ttlMillis(), TimeUnit.SECONDS)
.expirationListener(expirationObserver)
// FIXME: make the expiration policy configurable.
.expirationPolicy(ExpirationPolicy.CREATED).build());
@@ -188,4 +199,28 @@
return Objects.hash(tableName, key);
}
}
+
+ @Override
+ public void snapshotInstalled(State state) {
+ if (!tableEntryExpirationMap.isEmpty()) {
+ return;
+ }
+ for (String tableName : state.getTableNames()) {
+
+ TableMetadata metadata = state.getTableMetadata(tableName);
+ if (!metadata.expireOldEntries()) {
+ continue;
+ }
+
+ Map<DatabaseRow, VersionedValue> tableExpirationMap = ExpiringMap.builder()
+ .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
+ .expirationListener(expirationObserver)
+ .expirationPolicy(ExpirationPolicy.CREATED).build();
+ for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
+ tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue());
+ }
+
+ tableEntryExpirationMap.put(tableName, tableExpirationMap);
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index b2fe19f..e233e5a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -14,7 +14,6 @@
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.Copycat;
-import net.kuujo.copycat.StateMachine;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.TcpCluster;
@@ -34,6 +33,7 @@
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchReadResult;
import org.onlab.onos.store.service.BatchWriteRequest;
@@ -65,6 +65,9 @@
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DatabaseProtocolService copycatMessagingProtocol;
// FIXME: point to appropriate path
@@ -158,7 +161,13 @@
log.info("Starting cluster: {}", cluster);
- StateMachine stateMachine = new DatabaseStateMachine();
+ DatabaseStateMachine stateMachine = new DatabaseStateMachine();
+ stateMachine.addEventListener(
+ new DatabaseEntryExpirationTracker(
+ clusterConfig.getLocalMember(),
+ clusterService.getLocalNode(),
+ clusterCommunicator,
+ this));
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
ClusterMessagingProtocol.SERIALIZER);
@@ -183,6 +192,11 @@
}
@Override
+ public boolean createTable(String name, int ttlMillis) {
+ return client.createTable(name, ttlMillis);
+ }
+
+ @Override
public void dropTable(String name) {
client.dropTable(name);
}
@@ -418,4 +432,4 @@
}
return null;
}
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 62a06b4..3b0d874 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -30,9 +30,10 @@
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
/**
@@ -57,6 +58,7 @@
serializerPool = KryoNamespace.newBuilder()
.register(VersionedValue.class)
.register(State.class)
+ .register(TableMetadata.class)
.register(BatchReadRequest.class)
.register(BatchWriteRequest.class)
.register(ReadStatus.class)
@@ -69,7 +71,7 @@
}
};
- private final List<DatabaseUpdateEventListener> listeners = Lists.newLinkedList();
+ private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
// durable internal state of the database.
private State state = new State();
@@ -78,34 +80,31 @@
@Command
public boolean createTable(String tableName) {
- Map<String, VersionedValue> existingTable =
- state.getTables().putIfAbsent(tableName, Maps.newHashMap());
- if (existingTable == null) {
- for (DatabaseUpdateEventListener listener : listeners) {
- listener.tableCreated(tableName, Integer.MAX_VALUE);
- }
- return true;
- }
- return false;
+ TableMetadata metadata = new TableMetadata(tableName);
+ return createTable(metadata);
}
@Command
- public boolean createTable(String tableName, int expirationTimeMillis) {
- Map<String, VersionedValue> existingTable =
- state.getTables().putIfAbsent(tableName, Maps.newHashMap());
- if (existingTable == null) {
- for (DatabaseUpdateEventListener listener : listeners) {
- listener.tableCreated(tableName, expirationTimeMillis);
- }
- return true;
+ public boolean createTable(String tableName, int ttlMillis) {
+ TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
+ return createTable(metadata);
+ }
+
+ private boolean createTable(TableMetadata metadata) {
+ Map<String, VersionedValue> existingTable = state.getTable(metadata.tableName());
+ if (existingTable != null) {
+ return false;
}
- return false;
+ state.createTable(metadata);
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.tableCreated(metadata);
+ }
+ return true;
}
@Command
public boolean dropTable(String tableName) {
- Map<String, VersionedValue> table = state.getTables().remove(tableName);
- if (table != null) {
+ if (state.removeTable(tableName)) {
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableDeleted(tableName);
}
@@ -116,8 +115,8 @@
@Command
public boolean dropAllTables() {
- Set<String> tableNames = state.getTables().keySet();
- state.getTables().clear();
+ Set<String> tableNames = state.getTableNames();
+ state.removeAllTables();
for (DatabaseUpdateEventListener listener : listeners) {
for (String tableName : tableNames) {
listener.tableDeleted(tableName);
@@ -127,15 +126,15 @@
}
@Query
- public List<String> listTables() {
- return ImmutableList.copyOf(state.getTables().keySet());
+ public Set<String> listTables() {
+ return ImmutableSet.copyOf(state.getTableNames());
}
@Query
public List<ReadResult> read(BatchReadRequest batchRequest) {
List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
for (ReadRequest request : batchRequest.getAsList()) {
- Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+ Map<String, VersionedValue> table = state.getTable(request.tableName());
if (table == null) {
results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
continue;
@@ -186,7 +185,7 @@
boolean abort = false;
List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
for (WriteRequest request : batchRequest.getAsList()) {
- Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+ Map<String, VersionedValue> table = state.getTable(request.tableName());
if (table == null) {
validationResults.add(WriteStatus.NO_SUCH_TABLE);
abort = true;
@@ -218,7 +217,7 @@
// apply changes
for (WriteRequest request : batchRequest.getAsList()) {
- Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+ Map<String, VersionedValue> table = state.getTable(request.tableName());
TableModificationEvent tableModificationEvent = null;
// FIXME: If this method could be called by multiple thread,
@@ -274,19 +273,78 @@
return results;
}
- public class State {
+ public static class State {
- private final Map<String, Map<String, VersionedValue>> tables =
- Maps.newHashMap();
+ private final Map<String, TableMetadata> tableMetadata = Maps.newHashMap();
+ private final Map<String, Map<String, VersionedValue>> tableData = Maps.newHashMap();
private long versionCounter = 1;
- Map<String, Map<String, VersionedValue>> getTables() {
- return tables;
+ public Map<String, VersionedValue> getTable(String tableName) {
+ return tableData.get(tableName);
+ }
+
+ void createTable(TableMetadata metadata) {
+ tableMetadata.put(metadata.tableName, metadata);
+ tableData.put(metadata.tableName, Maps.newHashMap());
+ }
+
+ TableMetadata getTableMetadata(String tableName) {
+ return tableMetadata.get(tableName);
}
long nextVersion() {
return versionCounter++;
}
+
+ Set<String> getTableNames() {
+ return ImmutableSet.copyOf(tableMetadata.keySet());
+ }
+
+
+ boolean removeTable(String tableName) {
+ if (!tableMetadata.containsKey(tableName)) {
+ return false;
+ }
+ tableMetadata.remove(tableName);
+ tableData.remove(tableName);
+ return true;
+ }
+
+ void removeAllTables() {
+ tableMetadata.clear();
+ tableData.clear();
+ }
+ }
+
+ public static class TableMetadata {
+ private final String tableName;
+ private final boolean expireOldEntries;
+ private final int ttlMillis;
+
+ public TableMetadata(String tableName) {
+ this.tableName = tableName;
+ this.expireOldEntries = false;
+ this.ttlMillis = Integer.MAX_VALUE;
+
+ }
+
+ public TableMetadata(String tableName, int ttlMillis) {
+ this.tableName = tableName;
+ this.expireOldEntries = true;
+ this.ttlMillis = ttlMillis;
+ }
+
+ public String tableName() {
+ return tableName;
+ }
+
+ public boolean expireOldEntries() {
+ return expireOldEntries;
+ }
+
+ public int ttlMillis() {
+ return ttlMillis;
+ }
}
@Override
@@ -319,13 +377,30 @@
} else {
this.state = SERIALIZER.decode(data);
}
+
+ // FIXME: synchronize.
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.snapshotInstalled(state);
+ }
} catch (Exception e) {
log.error("Failed to install from snapshot", e);
throw new SnapshotException(e);
}
}
+ /**
+ * Adds specified DatabaseUpdateEventListener.
+ * @param listener listener to add
+ */
public void addEventListener(DatabaseUpdateEventListener listener) {
listeners.add(listener);
}
+
+ /**
+ * Removes specified DatabaseUpdateEventListener.
+ * @param listener listener to remove
+ */
+ public void removeEventListener(DatabaseUpdateEventListener listener) {
+ listeners.remove(listener);
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
index 1dc0e9d..6eb8703 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
@@ -16,6 +16,8 @@
package org.onlab.onos.store.service.impl;
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
+
/**
* Interface of database update event listeners.
*/
@@ -29,14 +31,19 @@
/**
* Notifies listeners of a table created event.
- * @param tableName name of the table created
- * @param expirationTimeMillis TTL for entries added to the table (measured since last update time)
+ * @param metadata metadata for the created table.
*/
- public void tableCreated(String tableName, int expirationTimeMillis);
+ public void tableCreated(TableMetadata metadata);
/**
* Notifies listeners of a table deleted event.
* @param tableName name of the table deleted
*/
public void tableDeleted(String tableName);
-}
\ No newline at end of file
+
+ /**
+ * Notifies listeners of a snapshot installation event.
+ * @param snapshotState installed snapshot state.
+ */
+ public void snapshotInstalled(DatabaseStateMachine.State snapshotState);
+}