[ONOS-7068] Upgrade to Atomix 2.0.4
Change-Id: I864567084b1ac230254533026f06d4a25e5e0a2d
diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml
index 3debfc8..f6966ea 100644
--- a/core/store/primitives/pom.xml
+++ b/core/store/primitives/pom.xml
@@ -70,7 +70,7 @@
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix</artifactId>
- <version>2.0.2</version>
+ <version>2.0.4</version>
</dependency>
<dependency>
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index 40e9fa0..ab165bb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import io.atomix.protocols.raft.cluster.MemberId;
@@ -27,6 +28,8 @@
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
import io.atomix.protocols.raft.protocol.CommandRequest;
import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
import io.atomix.protocols.raft.protocol.MetadataRequest;
@@ -87,6 +90,16 @@
}
@Override
+ public void registerHeartbeatHandler(Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> function) {
+ clusterCommunicator.addSubscriber(context.heartbeatSubject, serializer::decode, function, serializer::encode);
+ }
+
+ @Override
+ public void unregisterHeartbeatHandler() {
+ clusterCommunicator.removeSubscriber(context.heartbeatSubject);
+ }
+
+ @Override
public void reset(Collection<MemberId> members, ResetRequest request) {
Set<NodeId> nodes = members.stream().map(m -> NodeId.nodeId(m.id())).collect(Collectors.toSet());
clusterCommunicator.multicast(
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
index 2aecd0b..9f1f1bc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
@@ -22,6 +22,7 @@
*/
final class RaftMessageContext {
private final String prefix;
+ final MessageSubject heartbeatSubject;
final MessageSubject openSessionSubject;
final MessageSubject closeSessionSubject;
final MessageSubject keepAliveSubject;
@@ -40,6 +41,7 @@
RaftMessageContext(String prefix) {
this.prefix = prefix;
+ this.heartbeatSubject = getSubject(prefix, "heartbeat");
this.openSessionSubject = getSubject(prefix, "open");
this.closeSessionSubject = getSubject(prefix, "close");
this.keepAliveSubject = getSubject(prefix, "keep-alive");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 9b8f3e6..08c5f48 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -29,6 +29,8 @@
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -154,6 +156,11 @@
}
@Override
+ public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, HeartbeatRequest request) {
+ return sendAndReceive(context.heartbeatSubject, request, memberId);
+ }
+
+ @Override
public void registerOpenSessionHandler(
Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
index 0333b00..8350499 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
@@ -39,6 +39,8 @@
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -106,6 +108,8 @@
.register(CloseSessionResponse.class)
.register(KeepAliveRequest.class)
.register(KeepAliveResponse.class)
+ .register(HeartbeatRequest.class)
+ .register(HeartbeatResponse.class)
.register(QueryRequest.class)
.register(QueryResponse.class)
.register(CommandRequest.class)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 8f1ffa3..a768a4f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -264,7 +264,8 @@
.withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
- .withTimeout(Duration.ofSeconds(5))
+ .withMinTimeout(Duration.ofSeconds(1))
+ .withMaxTimeout(Duration.ofSeconds(5))
.withMaxRetries(5)
.build()
.open()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index bb071a2..5e322d5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -42,7 +42,7 @@
private static final int MAX_SEGMENT_SIZE = 1024 * 1024 * 64;
private static final long ELECTION_TIMEOUT_MILLIS = 2500;
- private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
+ private static final long HEARTBEAT_INTERVAL_MILLIS = 250;
private final MemberId localMemberId;
private final StoragePartition partition;
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java
index 8720f45..698f015 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java
@@ -42,7 +42,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
AtomixAtomicCounterMapService service = new AtomixAtomicCounterMapService();
service.put(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
index 6563da5..09ac5ec 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
@@ -46,7 +46,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
AtomixConsistentMapService service = new AtomixConsistentMapService();
service.put(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
index fc9d34e..c05ebcb 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
@@ -50,7 +50,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
AtomixConsistentSetMultimapService service = new AtomixConsistentSetMultimapService();
service.put(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java
index 36d6874..fa7161f 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java
@@ -42,7 +42,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
AtomixCounterService service = new AtomixCounterService();
service.set(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java
index 555b82b..fd92e2b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java
@@ -59,7 +59,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
AtomixDocumentTreeService service = new AtomixDocumentTreeService(ordering);
service.update(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java
index e4c13fb..2d69394 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java
@@ -15,8 +15,6 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import java.util.concurrent.ScheduledExecutorService;
-
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.impl.RaftContext;
@@ -34,6 +32,8 @@
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.storage.StorageLevel;
import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
import io.atomix.utils.concurrent.ThreadContext;
import org.junit.Test;
import org.onosproject.cluster.Leadership;
@@ -58,7 +58,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
DefaultServiceContext context = mock(DefaultServiceContext.class);
expect(context.serviceType()).andReturn(ServiceType.from(LEADER_ELECTOR.name())).anyTimes();
@@ -85,10 +85,11 @@
"test",
ServiceType.from(LEADER_ELECTOR.name()),
ReadConsistency.LINEARIZABLE,
+ 100,
5000,
context,
server,
- mock(ScheduledExecutorService.class)),
+ new SingleThreadContextFactory(new AtomixThreadFactory())),
System.currentTimeMillis()));
try (SnapshotWriter writer = snapshot.openWriter()) {
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index 6ed8ec3..f2b8385 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -56,6 +56,8 @@
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -117,6 +119,8 @@
.register(CloseSessionResponse.class)
.register(KeepAliveRequest.class)
.register(KeepAliveResponse.class)
+ .register(HeartbeatRequest.class)
+ .register(HeartbeatResponse.class)
.register(QueryRequest.class)
.register(QueryResponse.class)
.register(CommandRequest.class)
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java
index 6cb8667..6c69abe 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java
@@ -17,7 +17,6 @@
import java.util.Arrays;
import java.util.Collection;
-import java.util.concurrent.ScheduledExecutorService;
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.cluster.MemberId;
@@ -36,6 +35,8 @@
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.storage.StorageLevel;
import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
import io.atomix.utils.concurrent.ThreadContext;
import org.junit.Test;
import org.onosproject.store.service.Task;
@@ -60,7 +61,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
DefaultServiceContext context = mock(DefaultServiceContext.class);
expect(context.serviceType()).andReturn(ServiceType.from(WORK_QUEUE.name())).anyTimes();
@@ -79,10 +80,11 @@
"test",
ServiceType.from(WORK_QUEUE.name()),
ReadConsistency.LINEARIZABLE,
+ 100,
5000,
context,
server,
- mock(ScheduledExecutorService.class));
+ new SingleThreadContextFactory(new AtomixThreadFactory()));
AtomixWorkQueueService service = new AtomixWorkQueueService();
service.init(context);