[ONOS-7068] Upgrade to Atomix 2.0.4
Change-Id: I864567084b1ac230254533026f06d4a25e5e0a2d
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index 40e9fa0..ab165bb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import io.atomix.protocols.raft.cluster.MemberId;
@@ -27,6 +28,8 @@
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
import io.atomix.protocols.raft.protocol.CommandRequest;
import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
import io.atomix.protocols.raft.protocol.MetadataRequest;
@@ -87,6 +90,16 @@
}
@Override
+ public void registerHeartbeatHandler(Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> function) {
+ clusterCommunicator.addSubscriber(context.heartbeatSubject, serializer::decode, function, serializer::encode);
+ }
+
+ @Override
+ public void unregisterHeartbeatHandler() {
+ clusterCommunicator.removeSubscriber(context.heartbeatSubject);
+ }
+
+ @Override
public void reset(Collection<MemberId> members, ResetRequest request) {
Set<NodeId> nodes = members.stream().map(m -> NodeId.nodeId(m.id())).collect(Collectors.toSet());
clusterCommunicator.multicast(
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
index 2aecd0b..9f1f1bc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
@@ -22,6 +22,7 @@
*/
final class RaftMessageContext {
private final String prefix;
+ final MessageSubject heartbeatSubject;
final MessageSubject openSessionSubject;
final MessageSubject closeSessionSubject;
final MessageSubject keepAliveSubject;
@@ -40,6 +41,7 @@
RaftMessageContext(String prefix) {
this.prefix = prefix;
+ this.heartbeatSubject = getSubject(prefix, "heartbeat");
this.openSessionSubject = getSubject(prefix, "open");
this.closeSessionSubject = getSubject(prefix, "close");
this.keepAliveSubject = getSubject(prefix, "keep-alive");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 9b8f3e6..08c5f48 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -29,6 +29,8 @@
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -154,6 +156,11 @@
}
@Override
+ public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, HeartbeatRequest request) {
+ return sendAndReceive(context.heartbeatSubject, request, memberId);
+ }
+
+ @Override
public void registerOpenSessionHandler(
Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
index 0333b00..8350499 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
@@ -39,6 +39,8 @@
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -106,6 +108,8 @@
.register(CloseSessionResponse.class)
.register(KeepAliveRequest.class)
.register(KeepAliveResponse.class)
+ .register(HeartbeatRequest.class)
+ .register(HeartbeatResponse.class)
.register(QueryRequest.class)
.register(QueryResponse.class)
.register(CommandRequest.class)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 8f1ffa3..a768a4f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -264,7 +264,8 @@
.withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
- .withTimeout(Duration.ofSeconds(5))
+ .withMinTimeout(Duration.ofSeconds(1))
+ .withMaxTimeout(Duration.ofSeconds(5))
.withMaxRetries(5)
.build()
.open()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index bb071a2..5e322d5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -42,7 +42,7 @@
private static final int MAX_SEGMENT_SIZE = 1024 * 1024 * 64;
private static final long ELECTION_TIMEOUT_MILLIS = 2500;
- private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
+ private static final long HEARTBEAT_INTERVAL_MILLIS = 250;
private final MemberId localMemberId;
private final StoragePartition partition;