1. Fixed a synchronization issue with database update processing and expiry tracking.
2. Fixed a synchronization issue with MapDBLog appendEntries method.
3. DatabaseClient now uses ProtocolClient to interact with Raft cluster.
4. Misc javdoc and logging improvements
Change-Id: I147eb5bf859cf9827df452d62ab415d643a00aa4
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 5958bb7..f6ea217 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -1,5 +1,6 @@
package org.onlab.onos.store.service.impl;
+import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.ByteArrayInputStream;
@@ -9,6 +10,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
@@ -49,6 +52,9 @@
private final Logger log = getLogger(getClass());
+ private final ExecutorService updatesExecutor =
+ Executors.newSingleThreadExecutor(namedThreads("database-statemachine-updates"));
+
// message subject for database update notifications.
public static final MessageSubject DATABASE_UPDATE_EVENTS =
new MessageSubject("database-update-events");
@@ -88,8 +94,7 @@
}
@Command
- public boolean createTableWithExpiration(String tableName) {
- int ttlMillis = 10000;
+ public boolean createTable(String tableName, Integer ttlMillis) {
TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
return createTable(metadata);
}
@@ -100,18 +105,32 @@
return false;
}
state.createTable(metadata);
- for (DatabaseUpdateEventListener listener : listeners) {
- listener.tableCreated(metadata);
- }
+
+ updatesExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.tableCreated(metadata);
+ }
+ }
+ });
+
return true;
}
@Command
public boolean dropTable(String tableName) {
if (state.removeTable(tableName)) {
- for (DatabaseUpdateEventListener listener : listeners) {
- listener.tableDeleted(tableName);
- }
+
+ updatesExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.tableDeleted(tableName);
+ }
+ }
+ });
+
return true;
}
return false;
@@ -121,11 +140,18 @@
public boolean dropAllTables() {
Set<String> tableNames = state.getTableNames();
state.removeAllTables();
- for (DatabaseUpdateEventListener listener : listeners) {
- for (String tableName : tableNames) {
- listener.tableDeleted(tableName);
+
+ updatesExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ for (String tableName : tableNames) {
+ listener.tableDeleted(tableName);
+ }
+ }
}
- }
+ });
+
return true;
}
@@ -273,12 +299,18 @@
}
// notify listeners of table mod events.
- for (DatabaseUpdateEventListener listener : listeners) {
- for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
- log.trace("Publishing table modification event: {}", tableModificationEvent);
- listener.tableModified(tableModificationEvent);
+
+ updatesExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
+ log.trace("Publishing table modification event: {}", tableModificationEvent);
+ listener.tableModified(tableModificationEvent);
+ }
+ }
}
- }
+ });
return results;
}
@@ -397,10 +429,15 @@
this.state = SERIALIZER.decode(data);
}
- // FIXME: synchronize.
- for (DatabaseUpdateEventListener listener : listeners) {
- listener.snapshotInstalled(state);
- }
+ updatesExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.snapshotInstalled(state);
+ }
+ }
+ });
+
} catch (Exception e) {
log.error("Failed to install from snapshot", e);
throw new SnapshotException(e);