[ONOS-7068] Upgrade to Atomix 2.0.4
Change-Id: I864567084b1ac230254533026f06d4a25e5e0a2d
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index e8dde09..b1bae21 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -87,7 +87,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected UpgradeService upgradeService;
- private static final long DEFAULT_ELECTION_TIMEOUT_MILLIS = 5000;
+ private static final long DEFAULT_ELECTION_TIMEOUT_MILLIS = 250;
@Property(name = "electionTimeoutMillis", longValue = DEFAULT_ELECTION_TIMEOUT_MILLIS,
label = "the leader election timeout in milliseconds")
private long electionTimeoutMillis = DEFAULT_ELECTION_TIMEOUT_MILLIS;
diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml
index 88f30a5..549e73f 100644
--- a/core/store/primitives/pom.xml
+++ b/core/store/primitives/pom.xml
@@ -70,7 +70,7 @@
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix</artifactId>
- <version>2.0.2</version>
+ <version>2.0.4</version>
</dependency>
<dependency>
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index ecba56d..45f40ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import io.atomix.protocols.raft.cluster.MemberId;
@@ -27,6 +28,8 @@
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
import io.atomix.protocols.raft.protocol.CommandRequest;
import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
import io.atomix.protocols.raft.protocol.MetadataRequest;
@@ -86,6 +89,16 @@
}
@Override
+ public void registerHeartbeatHandler(Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> function) {
+ clusterCommunicator.addSubscriber(context.heartbeatSubject, serializer::decode, function, serializer::encode);
+ }
+
+ @Override
+ public void unregisterHeartbeatHandler() {
+ clusterCommunicator.removeSubscriber(context.heartbeatSubject);
+ }
+
+ @Override
public void reset(Collection<MemberId> members, ResetRequest request) {
Set<NodeId> nodes = members.stream().map(m -> NodeId.nodeId(m.id())).collect(Collectors.toSet());
clusterCommunicator.multicast(
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
index 2aecd0b..9f1f1bc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
@@ -22,6 +22,7 @@
*/
final class RaftMessageContext {
private final String prefix;
+ final MessageSubject heartbeatSubject;
final MessageSubject openSessionSubject;
final MessageSubject closeSessionSubject;
final MessageSubject keepAliveSubject;
@@ -40,6 +41,7 @@
RaftMessageContext(String prefix) {
this.prefix = prefix;
+ this.heartbeatSubject = getSubject(prefix, "heartbeat");
this.openSessionSubject = getSubject(prefix, "open");
this.closeSessionSubject = getSubject(prefix, "close");
this.keepAliveSubject = getSubject(prefix, "keep-alive");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 097ee46..f58db45 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -29,6 +29,8 @@
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -153,6 +155,11 @@
}
@Override
+ public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, HeartbeatRequest request) {
+ return sendAndReceive(context.heartbeatSubject, request, memberId);
+ }
+
+ @Override
public void registerOpenSessionHandler(
Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
index 0333b00..8350499 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
@@ -39,6 +39,8 @@
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -106,6 +108,8 @@
.register(CloseSessionResponse.class)
.register(KeepAliveRequest.class)
.register(KeepAliveResponse.class)
+ .register(HeartbeatRequest.class)
+ .register(HeartbeatResponse.class)
.register(QueryRequest.class)
.register(QueryResponse.class)
.register(CommandRequest.class)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 53a7dd1..f5f8a99 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -264,7 +264,8 @@
.withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
- .withTimeout(Duration.ofMillis(timeUnit.toMillis(leaderTimeout)))
+ .withMinTimeout(Duration.ofMillis(timeUnit.toMillis(leaderTimeout)))
+ .withMaxTimeout(Duration.ofSeconds(5))
.withMaxRetries(5)
.build()
.open()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 3a15fce..dbe712d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -45,7 +45,7 @@
private static final int MAX_SEGMENT_SIZE = 1024 * 1024 * 64;
private static final long ELECTION_TIMEOUT_MILLIS = 2500;
- private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
+ private static final long HEARTBEAT_INTERVAL_MILLIS = 250;
private final MemberId localMemberId;
private final StoragePartition partition;
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java
index 8720f45..698f015 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java
@@ -42,7 +42,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
AtomixAtomicCounterMapService service = new AtomixAtomicCounterMapService();
service.put(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
index 6563da5..09ac5ec 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
@@ -46,7 +46,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
AtomixConsistentMapService service = new AtomixConsistentMapService();
service.put(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
index fc9d34e..c05ebcb 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
@@ -50,7 +50,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
AtomixConsistentSetMultimapService service = new AtomixConsistentSetMultimapService();
service.put(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java
index 36d6874..fa7161f 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java
@@ -42,7 +42,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
AtomixCounterService service = new AtomixCounterService();
service.set(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java
index 555b82b..fd92e2b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java
@@ -59,7 +59,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
AtomixDocumentTreeService service = new AtomixDocumentTreeService(ordering);
service.update(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java
index e4c13fb..2d69394 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java
@@ -15,8 +15,6 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import java.util.concurrent.ScheduledExecutorService;
-
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.impl.RaftContext;
@@ -34,6 +32,8 @@
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.storage.StorageLevel;
import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
import io.atomix.utils.concurrent.ThreadContext;
import org.junit.Test;
import org.onosproject.cluster.Leadership;
@@ -58,7 +58,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
DefaultServiceContext context = mock(DefaultServiceContext.class);
expect(context.serviceType()).andReturn(ServiceType.from(LEADER_ELECTOR.name())).anyTimes();
@@ -85,10 +85,11 @@
"test",
ServiceType.from(LEADER_ELECTOR.name()),
ReadConsistency.LINEARIZABLE,
+ 100,
5000,
context,
server,
- mock(ScheduledExecutorService.class)),
+ new SingleThreadContextFactory(new AtomixThreadFactory())),
System.currentTimeMillis()));
try (SnapshotWriter writer = snapshot.openWriter()) {
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index be24d00..1ade931 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -56,6 +56,8 @@
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -116,6 +118,8 @@
.register(CloseSessionResponse.class)
.register(KeepAliveRequest.class)
.register(KeepAliveResponse.class)
+ .register(HeartbeatRequest.class)
+ .register(HeartbeatResponse.class)
.register(QueryRequest.class)
.register(QueryResponse.class)
.register(CommandRequest.class)
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java
index 6cb8667..6c69abe 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java
@@ -17,7 +17,6 @@
import java.util.Arrays;
import java.util.Collection;
-import java.util.concurrent.ScheduledExecutorService;
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.cluster.MemberId;
@@ -36,6 +35,8 @@
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.storage.StorageLevel;
import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
import io.atomix.utils.concurrent.ThreadContext;
import org.junit.Test;
import org.onosproject.store.service.Task;
@@ -60,7 +61,7 @@
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
DefaultServiceContext context = mock(DefaultServiceContext.class);
expect(context.serviceType()).andReturn(ServiceType.from(WORK_QUEUE.name())).anyTimes();
@@ -79,10 +80,11 @@
"test",
ServiceType.from(WORK_QUEUE.name()),
ReadConsistency.LINEARIZABLE,
+ 100,
5000,
context,
server,
- mock(ScheduledExecutorService.class));
+ new SingleThreadContextFactory(new AtomixThreadFactory()));
AtomixWorkQueueService service = new AtomixWorkQueueService();
service.init(context);
diff --git a/features/features.xml b/features/features.xml
index e920ab0..707bcec 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -59,7 +59,7 @@
<bundle>mvn:com.typesafe/config/1.2.1</bundle>
<bundle>mvn:com.googlecode.concurrent-trees/concurrent-trees/2.6.0</bundle>
<bundle>mvn:commons-io/commons-io/2.4</bundle>
- <bundle>mvn:io.atomix/atomix/2.0.2</bundle>
+ <bundle>mvn:io.atomix/atomix/2.0.4</bundle>
<bundle>mvn:org.glassfish.jersey.core/jersey-client/2.25.1</bundle>
diff --git a/lib/BUCK b/lib/BUCK
index 478049d..62ade4c 100644
--- a/lib/BUCK
+++ b/lib/BUCK
@@ -1,4 +1,4 @@
-# ***** This file was auto-generated at Tue, 31 Oct 2017 20:36:56 GMT. Do not edit this file manually. *****
+# ***** This file was auto-generated at Fri, 3 Nov 2017 05:46:03 GMT. Do not edit this file manually. *****
# ***** Use onos-lib-gen *****
pass_thru_pom(
@@ -210,10 +210,10 @@
remote_jar (
name = 'atomix',
- out = 'atomix-2.0.2.jar',
- url = 'mvn:io.atomix:atomix:jar:2.0.2',
- sha1 = '9b3b05af6337c35bc69922fb7b535c78f07a5f5b',
- maven_coords = 'io.atomix:atomix:2.0.2',
+ out = 'atomix-2.0.4.jar',
+ url = 'mvn:io.atomix:atomix:jar:2.0.4',
+ sha1 = 'ad9165144b95174ab8c8ba1ae7481e6ca8a699e3',
+ maven_coords = 'io.atomix:atomix:2.0.4',
visibility = [ 'PUBLIC' ],
)
diff --git a/lib/deps.json b/lib/deps.json
index ffb46ed..f83ebf6 100644
--- a/lib/deps.json
+++ b/lib/deps.json
@@ -120,7 +120,7 @@
"aopalliance-repackaged": "mvn:org.glassfish.hk2.external:aopalliance-repackaged:2.5.0-b32",
"amqp-client": "mvn:com.rabbitmq:amqp-client:jar:3.6.1",
"asm": "mvn:org.ow2.asm:asm:5.0.4",
- "atomix": "mvn:io.atomix:atomix:2.0.2",
+ "atomix": "mvn:io.atomix:atomix:2.0.4",
"commons-codec": "mvn:commons-codec:commons-codec:1.10",
"commons-collections": "mvn:commons-collections:commons-collections:3.2.2",
"commons-configuration": "mvn:commons-configuration:commons-configuration:1.10",