[ONOS-6594] Upgrade to Atomix 2.0.0
Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index f0c0609..c1ea37c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -15,24 +15,23 @@
*/
package org.onosproject.store.primitives.impl;
-import static org.slf4j.LoggerFactory.getLogger;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Transport;
-import io.atomix.copycat.server.CopycatServer;
-import io.atomix.copycat.server.storage.Storage;
-import io.atomix.copycat.server.storage.StorageLevel;
-import io.atomix.manager.internal.ResourceManagerState;
-import io.atomix.manager.util.ResourceManagerTypeResolver;
-
import java.io.File;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+import io.atomix.protocols.raft.RaftServer;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.storage.StorageLevel;
+import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
import org.onosproject.store.service.PartitionInfo;
+import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* {@link StoragePartition} server.
*/
@@ -41,36 +40,34 @@
private final Logger log = getLogger(getClass());
private static final int MAX_ENTRIES_PER_LOG_SEGMENT = 32768;
+ private final MemberId localMemberId;
private final StoragePartition partition;
- private final Address localAddress;
- private final Supplier<Transport> transport;
- private final Serializer serializer;
+ private final Supplier<RaftServerProtocol> protocol;
private final File dataFolder;
- private CopycatServer server;
+ private RaftServer server;
- public StoragePartitionServer(Address localAddress,
+ public StoragePartitionServer(
StoragePartition partition,
- Serializer serializer,
- Supplier<Transport> transport,
+ MemberId localMemberId,
+ Supplier<RaftServerProtocol> protocol,
File dataFolder) {
this.partition = partition;
- this.localAddress = localAddress;
- this.serializer = serializer;
- this.transport = transport;
+ this.localMemberId = localMemberId;
+ this.protocol = protocol;
this.dataFolder = dataFolder;
}
@Override
public CompletableFuture<Void> open() {
- CompletableFuture<CopycatServer> serverOpenFuture;
- if (partition.getMemberAddresses().contains(localAddress)) {
+ CompletableFuture<RaftServer> serverOpenFuture;
+ if (partition.getMemberIds().contains(localMemberId)) {
if (server != null && server.isRunning()) {
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
server = buildServer();
}
- serverOpenFuture = server.bootstrap(partition.getMemberAddresses());
+ serverOpenFuture = server.bootstrap(partition.getMemberIds());
} else {
serverOpenFuture = CompletableFuture.completedFuture(null);
}
@@ -96,24 +93,21 @@
return server.leave();
}
- private CopycatServer buildServer() {
- CopycatServer server = CopycatServer.builder(localAddress)
+ private RaftServer buildServer() {
+ RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
.withName("partition-" + partition.getId())
- .withSerializer(serializer.clone())
- .withTransport(transport.get())
- .withStateMachine(ResourceManagerState::new)
- .withStorage(Storage.builder()
+ .withProtocol(protocol.get())
+ .withStorage(RaftStorage.newBuilder()
.withStorageLevel(StorageLevel.DISK)
- .withCompactionThreads(1)
+ .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
.withDirectory(dataFolder)
.withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
- .build())
- .build();
- server.serializer().resolve(new ResourceManagerTypeResolver());
- return server;
+ .build());
+ StoragePartition.RAFT_SERVICES.forEach(builder::addService);
+ return builder.build();
}
- public CompletableFuture<Void> join(Collection<Address> otherMembers) {
+ public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
server = buildServer();
return server.join(otherMembers).whenComplete((r, e) -> {
if (e == null) {
@@ -135,9 +129,9 @@
*/
public PartitionInfo info() {
return new StoragePartitionDetails(partition.getId(),
- server.cluster().members(),
- server.cluster().members(),
- server.cluster().leader(),
- server.cluster().term()).toPartitionInfo();
+ server.cluster().getMembers(),
+ server.cluster().getMembers(),
+ server.cluster().getLeader(),
+ server.cluster().getTerm()).toPartitionInfo();
}
}