[ONOS-7068] Upgrade to Atomix 2.0.4

Change-Id: I864567084b1ac230254533026f06d4a25e5e0a2d
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index 40e9fa0..ab165bb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -20,6 +20,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import io.atomix.protocols.raft.cluster.MemberId;
@@ -27,6 +28,8 @@
 import io.atomix.protocols.raft.protocol.CloseSessionResponse;
 import io.atomix.protocols.raft.protocol.CommandRequest;
 import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
 import io.atomix.protocols.raft.protocol.KeepAliveRequest;
 import io.atomix.protocols.raft.protocol.KeepAliveResponse;
 import io.atomix.protocols.raft.protocol.MetadataRequest;
@@ -87,6 +90,16 @@
     }
 
     @Override
+    public void registerHeartbeatHandler(Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> function) {
+        clusterCommunicator.addSubscriber(context.heartbeatSubject, serializer::decode, function, serializer::encode);
+    }
+
+    @Override
+    public void unregisterHeartbeatHandler() {
+        clusterCommunicator.removeSubscriber(context.heartbeatSubject);
+    }
+
+    @Override
     public void reset(Collection<MemberId> members, ResetRequest request) {
         Set<NodeId> nodes = members.stream().map(m -> NodeId.nodeId(m.id())).collect(Collectors.toSet());
         clusterCommunicator.multicast(
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
index 2aecd0b..9f1f1bc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
@@ -22,6 +22,7 @@
  */
 final class RaftMessageContext {
     private final String prefix;
+    final MessageSubject heartbeatSubject;
     final MessageSubject openSessionSubject;
     final MessageSubject closeSessionSubject;
     final MessageSubject keepAliveSubject;
@@ -40,6 +41,7 @@
 
     RaftMessageContext(String prefix) {
         this.prefix = prefix;
+        this.heartbeatSubject = getSubject(prefix, "heartbeat");
         this.openSessionSubject = getSubject(prefix, "open");
         this.closeSessionSubject = getSubject(prefix, "close");
         this.keepAliveSubject = getSubject(prefix, "keep-alive");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 9b8f3e6..08c5f48 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -29,6 +29,8 @@
 import io.atomix.protocols.raft.protocol.CommandResponse;
 import io.atomix.protocols.raft.protocol.ConfigureRequest;
 import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
 import io.atomix.protocols.raft.protocol.InstallRequest;
 import io.atomix.protocols.raft.protocol.InstallResponse;
 import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -154,6 +156,11 @@
     }
 
     @Override
+    public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, HeartbeatRequest request) {
+        return sendAndReceive(context.heartbeatSubject, request, memberId);
+    }
+
+    @Override
     public void registerOpenSessionHandler(
             Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
         clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
index 0333b00..8350499 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
@@ -39,6 +39,8 @@
 import io.atomix.protocols.raft.protocol.CommandResponse;
 import io.atomix.protocols.raft.protocol.ConfigureRequest;
 import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
 import io.atomix.protocols.raft.protocol.InstallRequest;
 import io.atomix.protocols.raft.protocol.InstallResponse;
 import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -106,6 +108,8 @@
             .register(CloseSessionResponse.class)
             .register(KeepAliveRequest.class)
             .register(KeepAliveResponse.class)
+            .register(HeartbeatRequest.class)
+            .register(HeartbeatResponse.class)
             .register(QueryRequest.class)
             .register(QueryResponse.class)
             .register(CommandRequest.class)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 8f1ffa3..a768a4f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -264,7 +264,8 @@
                 .withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
                 .withReadConsistency(ReadConsistency.LINEARIZABLE)
                 .withCommunicationStrategy(CommunicationStrategy.LEADER)
-                .withTimeout(Duration.ofSeconds(5))
+                .withMinTimeout(Duration.ofSeconds(1))
+                .withMaxTimeout(Duration.ofSeconds(5))
                 .withMaxRetries(5)
                 .build()
                 .open()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index bb071a2..5e322d5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -42,7 +42,7 @@
 
     private static final int MAX_SEGMENT_SIZE = 1024 * 1024 * 64;
     private static final long ELECTION_TIMEOUT_MILLIS = 2500;
-    private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
+    private static final long HEARTBEAT_INTERVAL_MILLIS = 250;
 
     private final MemberId localMemberId;
     private final StoragePartition partition;