Make various Raft server/client/storage options configurable
Change-Id: Ied90d25032593dd2a738761ec3bf45102a4c189f
(cherry picked from commit ff7c20cda3d80bac7c6f7bf4db2639e8255b3d95)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 7c8dee8..2318d08 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -69,6 +69,37 @@
private static final int MAX_RETRIES = 8;
private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
+ private static final String MIN_TIMEOUT_PROPERTY = "onos.cluster.raft.client.minTimeoutMillis";
+ private static final String MAX_TIMEOUT_PROPERTY = "onos.cluster.raft.client.maxTimeoutMillis";
+
+ private static final Duration MIN_TIMEOUT;
+ private static final Duration MAX_TIMEOUT;
+
+ private static final long DEFAULT_MIN_TIMEOUT_MILLIS = 5000;
+ private static final long DEFAULT_MAX_TIMEOUT_MILLIS = 30000;
+
+ static {
+ Duration minTimeout;
+ try {
+ minTimeout = Duration.ofMillis(Long.parseLong(
+ System.getProperty(MIN_TIMEOUT_PROPERTY,
+ String.valueOf(DEFAULT_MIN_TIMEOUT_MILLIS))));
+ } catch (NumberFormatException e) {
+ minTimeout = Duration.ofMillis(DEFAULT_MIN_TIMEOUT_MILLIS);
+ }
+ MIN_TIMEOUT = minTimeout;
+
+ Duration maxTimeout;
+ try {
+ maxTimeout = Duration.ofMillis(Long.parseLong(
+ System.getProperty(MAX_TIMEOUT_PROPERTY,
+ String.valueOf(DEFAULT_MAX_TIMEOUT_MILLIS))));
+ } catch (NumberFormatException e) {
+ maxTimeout = Duration.ofMillis(DEFAULT_MAX_TIMEOUT_MILLIS);
+ }
+ MAX_TIMEOUT = maxTimeout;
+ }
+
private final Logger log = getLogger(getClass());
private final StoragePartition partition;
@@ -113,7 +144,8 @@
.withServiceType(DistributedPrimitive.Type.CONSISTENT_MAP.name())
.withReadConsistency(ReadConsistency.SEQUENTIAL)
.withCommunicationStrategy(CommunicationStrategy.ANY)
- .withTimeout(Duration.ofSeconds(30))
+ .withMinTimeout(MIN_TIMEOUT)
+ .withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
.build()
.open()
@@ -138,7 +170,8 @@
.withServiceType(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name())
.withReadConsistency(ReadConsistency.SEQUENTIAL)
.withCommunicationStrategy(CommunicationStrategy.ANY)
- .withTimeout(Duration.ofSeconds(30))
+ .withMinTimeout(MIN_TIMEOUT)
+ .withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
.build()
.open()
@@ -162,7 +195,8 @@
.withServiceType(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name())
.withReadConsistency(ReadConsistency.SEQUENTIAL)
.withCommunicationStrategy(CommunicationStrategy.ANY)
- .withTimeout(Duration.ofSeconds(30))
+ .withMinTimeout(MIN_TIMEOUT)
+ .withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
.build()
.open()
@@ -192,7 +226,8 @@
.withServiceType(DistributedPrimitive.Type.COUNTER_MAP.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
- .withTimeout(Duration.ofSeconds(30))
+ .withMinTimeout(MIN_TIMEOUT)
+ .withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
.build()
.open()
@@ -214,7 +249,8 @@
.withServiceType(DistributedPrimitive.Type.COUNTER.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
- .withTimeout(Duration.ofSeconds(30))
+ .withMinTimeout(MIN_TIMEOUT)
+ .withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
.build()
.open()
@@ -238,7 +274,8 @@
.withServiceType(DistributedPrimitive.Type.WORK_QUEUE.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
- .withTimeout(Duration.ofSeconds(5))
+ .withMinTimeout(MIN_TIMEOUT)
+ .withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
.build()
.open()
@@ -253,7 +290,8 @@
.withServiceType(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), ordering))
.withReadConsistency(ReadConsistency.SEQUENTIAL)
.withCommunicationStrategy(CommunicationStrategy.ANY)
- .withTimeout(Duration.ofSeconds(30))
+ .withMinTimeout(MIN_TIMEOUT)
+ .withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
.build()
.open()
@@ -268,8 +306,8 @@
.withServiceType(DistributedPrimitive.Type.LOCK.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
- .withMinTimeout(Duration.ofSeconds(1))
- .withMaxTimeout(Duration.ofSeconds(5))
+ .withMinTimeout(MIN_TIMEOUT)
+ .withMaxTimeout(MIN_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
.build()
.open()
@@ -284,7 +322,7 @@
.withReadConsistency(ReadConsistency.LINEARIZABLE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
.withMinTimeout(Duration.ofMillis(timeUnit.toMillis(leaderTimeout)))
- .withMaxTimeout(Duration.ofSeconds(5))
+ .withMaxTimeout(MIN_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
.build()
.open()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index eb48f66..f2600e8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -47,10 +47,101 @@
private final Logger log = getLogger(getClass());
- private static final int MAX_SEGMENT_SIZE = 1024 * 1024 * 64;
- private static final long ELECTION_TIMEOUT_MILLIS = 2500;
- private static final int ELECTION_THRESHOLD = 5;
- private static final long HEARTBEAT_INTERVAL_MILLIS = 500;
+ private static final String ELECTION_TIMEOUT_MILLIS_PROPERTY = "onos.cluster.raft.electionTimeoutMillis";
+ private static final String ELECTION_THRESHOLD_PROPERTY = "onos.cluster.raft.electionFailureThreshold";
+ private static final String SESSION_THRESHOLD_PROPERTY = "onos.cluster.raft.sessionFailureThreshold";
+ private static final String HEARTBEAT_INTERVAL_MILLIS_PROPERTY = "onos.cluster.raft.heartbeatIntervalMillis";
+ private static final String MAX_SEGMENT_SIZE_PROPERTY = "onos.cluster.raft.storage.maxSegmentSize";
+ private static final String STORAGE_LEVEL_PROPERTY = "onos.cluster.raft.storage.level";
+ private static final String FLUSH_ON_COMMIT_PROPERTY = "onos.cluster.raft.storage.flushOnCommit";
+
+ private static final long ELECTION_TIMEOUT_MILLIS;
+ private static final int ELECTION_THRESHOLD;
+ private static final int SESSION_THRESHOLD;
+ private static final long HEARTBEAT_INTERVAL_MILLIS;
+ private static final int MAX_SEGMENT_SIZE;
+ private static final StorageLevel STORAGE_LEVEL;
+ private static final boolean FLUSH_ON_COMMIT;
+
+ private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 64;
+ private static final long DEFAULT_ELECTION_TIMEOUT_MILLIS = 2500;
+ private static final int DEFAULT_ELECTION_THRESHOLD = 12;
+ private static final int DEFAULT_SESSION_THRESHOLD = 10;
+ private static final long DEFAULT_HEARTBEAT_INTERVAL_MILLIS = 500;
+ private static final StorageLevel DEFAULT_STORAGE_LEVEL = StorageLevel.MAPPED;
+ private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
+
+ static {
+ int maxSegmentSize;
+ try {
+ maxSegmentSize = Integer.parseInt(System.getProperty(
+ MAX_SEGMENT_SIZE_PROPERTY,
+ String.valueOf(DEFAULT_MAX_SEGMENT_SIZE)));
+ } catch (NumberFormatException e) {
+ maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
+ }
+ MAX_SEGMENT_SIZE = maxSegmentSize;
+
+ long electionTimeoutMillis;
+ try {
+ electionTimeoutMillis = Long.parseLong(System.getProperty(
+ ELECTION_TIMEOUT_MILLIS_PROPERTY,
+ String.valueOf(DEFAULT_ELECTION_TIMEOUT_MILLIS)));
+ } catch (NumberFormatException e) {
+ electionTimeoutMillis = DEFAULT_ELECTION_TIMEOUT_MILLIS;
+ }
+ ELECTION_TIMEOUT_MILLIS = electionTimeoutMillis;
+
+ int electionFailureThreshold;
+ try {
+ electionFailureThreshold = Integer.parseInt(System.getProperty(
+ ELECTION_THRESHOLD_PROPERTY,
+ String.valueOf(DEFAULT_ELECTION_THRESHOLD)));
+ } catch (NumberFormatException e) {
+ electionFailureThreshold = DEFAULT_ELECTION_THRESHOLD;
+ }
+ ELECTION_THRESHOLD = electionFailureThreshold;
+
+ int sessionFailureThreshold;
+ try {
+ sessionFailureThreshold = Integer.parseInt(System.getProperty(
+ SESSION_THRESHOLD_PROPERTY,
+ String.valueOf(DEFAULT_SESSION_THRESHOLD)));
+ } catch (NumberFormatException e) {
+ sessionFailureThreshold = DEFAULT_SESSION_THRESHOLD;
+ }
+ SESSION_THRESHOLD = sessionFailureThreshold;
+
+ long heartbeatIntervalMillis;
+ try {
+ heartbeatIntervalMillis = Long.parseLong(System.getProperty(
+ HEARTBEAT_INTERVAL_MILLIS_PROPERTY,
+ String.valueOf(DEFAULT_HEARTBEAT_INTERVAL_MILLIS)));
+ } catch (NumberFormatException e) {
+ heartbeatIntervalMillis = DEFAULT_HEARTBEAT_INTERVAL_MILLIS;
+ }
+ HEARTBEAT_INTERVAL_MILLIS = heartbeatIntervalMillis;
+
+ StorageLevel storageLevel;
+ try {
+ storageLevel = StorageLevel.valueOf(System.getProperty(
+ STORAGE_LEVEL_PROPERTY,
+ DEFAULT_STORAGE_LEVEL.name()).toUpperCase());
+ } catch (IllegalArgumentException e) {
+ storageLevel = DEFAULT_STORAGE_LEVEL;
+ }
+ STORAGE_LEVEL = storageLevel;
+
+ boolean flushOnCommit;
+ try {
+ flushOnCommit = Boolean.parseBoolean(System.getProperty(
+ FLUSH_ON_COMMIT_PROPERTY,
+ String.valueOf(DEFAULT_FLUSH_ON_COMMIT)));
+ } catch (Exception e) {
+ flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
+ }
+ FLUSH_ON_COMMIT = flushOnCommit;
+ }
private final MemberId localMemberId;
private final StoragePartition partition;
@@ -145,10 +236,11 @@
.withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
.withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
.withElectionThreshold(ELECTION_THRESHOLD)
+ .withSessionFailureThreshold(SESSION_THRESHOLD)
.withStorage(RaftStorage.newBuilder()
.withPrefix(String.format("partition-%s", partition.getId()))
- .withStorageLevel(StorageLevel.DISK)
- .withFlushOnCommit()
+ .withStorageLevel(STORAGE_LEVEL)
+ .withFlushOnCommit(FLUSH_ON_COMMIT)
.withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
.withDirectory(partition.getDataFolder())
.withMaxSegmentSize(MAX_SEGMENT_SIZE)
@@ -204,10 +296,11 @@
.withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
.withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
.withElectionThreshold(ELECTION_THRESHOLD)
+ .withSessionFailureThreshold(SESSION_THRESHOLD)
.withStorage(RaftStorage.newBuilder()
.withPrefix(String.format("partition-%s", partition.getId()))
- .withStorageLevel(StorageLevel.MAPPED)
- .withFlushOnCommit(false)
+ .withStorageLevel(STORAGE_LEVEL)
+ .withFlushOnCommit(FLUSH_ON_COMMIT)
.withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
.withDirectory(partition.getDataFolder())
.withMaxSegmentSize(MAX_SEGMENT_SIZE)