Using latest atomix release candidate + Updates to CopycatTransport
Change-Id: I960af428ff733ee7467024811e3b3470e951ecb7
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
index b5b1f21..3491987 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
@@ -86,7 +86,7 @@
/**
* Maps {@link Address address} to {@link Endpoint endpoint}.
- * @param address
+ * @param address address
* @return end point
*/
public static Endpoint toEndpoint(Address address) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
index 9a1dce9..5b15d33 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -16,6 +16,7 @@
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -23,8 +24,10 @@
import org.apache.commons.lang.math.RandomUtils;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
@@ -36,26 +39,30 @@
*/
public class CopycatTransportClient implements Client {
+ private final Logger log = getLogger(getClass());
private final PartitionId partitionId;
private final MessagingService messagingService;
private final CopycatTransport.Mode mode;
+ private final String newConnectionMessageSubject;
private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
CopycatTransportClient(PartitionId partitionId, MessagingService messagingService, CopycatTransport.Mode mode) {
this.partitionId = checkNotNull(partitionId);
this.messagingService = checkNotNull(messagingService);
this.mode = checkNotNull(mode);
+ this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
}
@Override
public CompletableFuture<Connection> connect(Address remoteAddress) {
ThreadContext context = ThreadContext.currentContextOrThrow();
return messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
- PartitionManager.HELLO_MESSAGE_SUBJECT,
- "hello".getBytes())
- .thenApplyAsync(r -> {
+ newConnectionMessageSubject,
+ Longs.toByteArray(nextConnectionId()))
+ .thenApplyAsync(bytes -> {
+ long connectionId = Longs.fromByteArray(bytes);
CopycatTransportConnection connection = new CopycatTransportConnection(
- nextConnectionId(),
+ connectionId,
CopycatTransport.Mode.CLIENT,
partitionId,
remoteAddress,
@@ -64,6 +71,7 @@
if (mode == CopycatTransport.Mode.CLIENT) {
connection.setBidirectional();
}
+ log.debug("Created new outgoing connection[id={}] to {}", connectionId, remoteAddress);
connections.add(connection);
return connection;
}, context.executor());
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index 39ce10f..d1b686f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -31,12 +31,14 @@
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
@@ -52,6 +54,7 @@
*/
public class CopycatTransportConnection implements Connection {
+ private final Logger log = getLogger(getClass());
private final Listeners<Throwable> exceptionListeners = new Listeners<>();
private final Listeners<Connection> closeListeners = new Listeners<>();
@@ -82,11 +85,11 @@
this.remoteAddress = checkNotNull(address);
this.messagingService = checkNotNull(messagingService);
if (mode == CopycatTransport.Mode.CLIENT) {
- this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
- this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
+ this.outboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
+ this.inboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
} else {
- this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
- this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
+ this.outboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
+ this.inboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
}
this.context = checkNotNull(context);
}
@@ -203,6 +206,7 @@
@Override
public CompletableFuture<Void> close() {
+ log.debug("Closing connection[id={}, mode={}] to {}", connectionId, mode, remoteAddress);
closeListeners.forEach(listener -> listener.accept(this));
if (mode == CopycatTransport.Mode.CLIENT) {
messagingService.unregisterHandler(inboundMessageSubject);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index 36a1958..2f24d6b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -35,6 +35,7 @@
import org.slf4j.Logger;
import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
@@ -54,13 +55,15 @@
private final ScheduledExecutorService executorService;
private final PartitionId partitionId;
private final MessagingService messagingService;
- private final String messageSubject;
+ private final String protocolMessageSubject;
+ private final String newConnectionMessageSubject;
private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
this.partitionId = checkNotNull(partitionId);
this.messagingService = checkNotNull(messagingService);
- this.messageSubject = String.format("onos-copycat-%s", partitionId);
+ this.protocolMessageSubject = String.format("onos-copycat-server-%s", partitionId);
+ this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),
new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
}
@@ -68,49 +71,49 @@
@Override
public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
if (listening.compareAndSet(false, true)) {
+ // message handler for all non-connection-establishment messages.
+ messagingService.registerHandler(protocolMessageSubject, (sender, payload) -> {
+ try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
+ long connectionId = input.readLong();
+ CopycatTransportConnection connection = connections.get(connectionId);
+ if (connection == null) {
+ throw new IOException("Closed connection");
+ }
+ byte[] messagePayload = IOUtils.toByteArray(input);
+ return connection.handle(messagePayload);
+ } catch (IOException e) {
+ return Tools.exceptionalFuture(e);
+ }
+ });
+
+ // message handler for new connection attempts.
ThreadContext context = ThreadContext.currentContextOrThrow();
- listen(address, listener, context);
+ messagingService.registerHandler(newConnectionMessageSubject, (sender, payload) -> {
+ long connectionId = Longs.fromByteArray(payload);
+ CopycatTransportConnection connection = new CopycatTransportConnection(connectionId,
+ CopycatTransport.Mode.SERVER,
+ partitionId,
+ CopycatTransport.toAddress(sender),
+ messagingService,
+ getOrCreateContext(context));
+ connections.put(connectionId, connection);
+ connection.closeListener(c -> connections.remove(connectionId, c));
+ log.debug("Created new incoming connection[id={}] from {}", connectionId, sender);
+ return CompletableFuture.supplyAsync(() -> {
+ listener.accept(connection);
+ // echo the connectionId back to indicate successful completion.
+ return payload;
+ }, context.executor());
+ });
+ context.execute(() -> listenFuture.complete(null));
}
return listenFuture;
}
- private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
- messagingService.registerHandler(messageSubject, (sender, payload) -> {
- try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
- long connectionId = input.readLong();
- AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
- CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
- newConnectionCreated.set(true);
- CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
- CopycatTransport.Mode.SERVER,
- partitionId,
- CopycatTransport.toAddress(sender),
- messagingService,
- getOrCreateContext(context));
- log.debug("Created new incoming connection {}", connectionId);
- newConnection.closeListener(c -> connections.remove(connectionId, c));
- return newConnection;
- });
- byte[] request = IOUtils.toByteArray(input);
- return CompletableFuture.supplyAsync(
- () -> {
- if (newConnectionCreated.get()) {
- listener.accept(connection);
- }
- return connection;
- }, context.executor()).thenCompose(c -> c.handle(request));
- } catch (IOException e) {
- return Tools.exceptionalFuture(e);
- }
- });
- context.execute(() -> {
- listenFuture.complete(null);
- });
- }
-
@Override
public CompletableFuture<Void> close() {
- messagingService.unregisterHandler(messageSubject);
+ messagingService.unregisterHandler(newConnectionMessageSubject);
+ messagingService.unregisterHandler(protocolMessageSubject);
executorService.shutdown();
return CompletableFuture.completedFuture(null);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
index 78fcedd..d567adc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
@@ -54,7 +54,7 @@
}
@Override
- public void write(T object, BufferOutput<?> buffer,
+ public void write(T object, BufferOutput buffer,
io.atomix.catalyst.serializer.Serializer serializer) {
try {
byte[] payload = this.serializer.encode(object);
@@ -66,7 +66,7 @@
}
@Override
- public T read(Class<T> type, BufferInput<?> buffer,
+ public T read(Class<T> type, BufferInput buffer,
io.atomix.catalyst.serializer.Serializer serializer) {
int size = buffer.readInt();
try {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 3826116..54abda2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -27,6 +27,7 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
@@ -56,7 +57,6 @@
public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
implements PartitionService, PartitionAdminService {
- public static final String HELLO_MESSAGE_SUBJECT = "partition-manager-hello";
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -72,8 +72,6 @@
@Activate
public void activate() {
- messagingService.registerHandler(HELLO_MESSAGE_SUBJECT,
- (ep, input) -> CompletableFuture.completedFuture(input));
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
metadataService.getClusterMetadata()
@@ -93,8 +91,8 @@
log.info("Started");
}
+ @Deactivate
public void deactivate() {
- messagingService.unregisterHandler(HELLO_MESSAGE_SUBJECT);
eventDispatcher.removeSink(PartitionEvent.class);
CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index f1eb4ce..2a3b834 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -90,7 +90,7 @@
private TransactionCoordinator transactionCoordinator;
@Activate
- public void actiavte() {
+ public void activate() {
basePrimitiveCreator = partitionService.getDistributedPrimitiveCreator(PartitionId.from(0));
Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
partitionService.getAllPartitionIds().stream()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
index 38b811e..3fb243d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
@@ -19,14 +19,14 @@
import java.util.Collection;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.service.PartitionInfo;
import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
/**
* Operational details for a {@code StoragePartition}.
@@ -98,9 +98,11 @@
* @return partition info
*/
public PartitionInfo toPartitionInfo() {
+ Function<Member, String> memberToString =
+ m -> m == null ? "none" : String.format("%s:%d", m.address().host(), m.address().port());
return new PartitionInfo(partitionId.toString(),
leaderTerm,
- Lists.transform(ImmutableList.copyOf(activeMembers), m -> m.address().toString()),
- leader == null ? "none" : leader.address().toString());
+ activeMembers.stream().map(memberToString).collect(Collectors.toList()),
+ memberToString.apply(leader));
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 834d8e9..dcf98f6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -22,6 +22,7 @@
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
+import io.atomix.manager.ResourceManagerTypeResolver;
import io.atomix.manager.state.ResourceManagerState;
import io.atomix.resource.ResourceRegistry;
import io.atomix.resource.ResourceType;
@@ -107,7 +108,7 @@
ResourceRegistry registry = new ResourceRegistry();
resourceTypes.forEach(registry::register);
resourceResolver.resolve(registry);
- return CopycatServer.builder(localAddress, partition.getMemberAddresses())
+ CopycatServer server = CopycatServer.builder(localAddress, partition.getMemberAddresses())
.withName("partition-" + partition.getId())
.withSerializer(serializer.clone())
.withTransport(transport.get())
@@ -119,6 +120,8 @@
.withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
.build())
.build();
+ server.serializer().resolve(new ResourceManagerTypeResolver(registry));
+ return server;
}
public Set<NodeId> configuredMembers() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 73bc8b7..df44b48 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -43,8 +43,10 @@
/**
* Distributed resource providing the {@link AsyncConsistentMap} primitive.
*/
-@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
-public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
+@ResourceTypeInfo(id = -151,
+ stateMachine = AtomixConsistentMapState.class,
+ typeResolver = AtomixConsistentMapCommands.TypeResolver.class)
+public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
implements AsyncConsistentMap<String, byte[]> {
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 72e52c2..9c9b019 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -26,6 +26,7 @@
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
+import io.atomix.resource.ResourceType;
import java.util.Collection;
import java.util.HashMap;
@@ -69,12 +70,17 @@
* State Machine for {@link AtomixConsistentMap} resource.
*/
public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
+
private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
private AtomicLong versionCounter = new AtomicLong(0);
+ public AtomixConsistentMapState() {
+ super(new ResourceType(AtomixConsistentMap.class));
+ }
+
@Override
public void snapshot(SnapshotWriter writer) {
writer.writeLong(versionCounter.get());
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 4e15a81..b7e48fa 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -25,7 +25,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
-import org.onlab.util.SharedExecutors;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
@@ -36,8 +35,10 @@
/**
* Distributed resource providing the {@link AsyncLeaderElector} primitive.
*/
-@ResourceTypeInfo(id = -152, stateMachine = AtomixLeaderElectorState.class)
-public class AtomixLeaderElector extends Resource<AtomixLeaderElector, Resource.Options>
+@ResourceTypeInfo(id = -152,
+ stateMachine = AtomixLeaderElectorState.class,
+ typeResolver = AtomixLeaderElectorCommands.TypeResolver.class)
+public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
implements AsyncLeaderElector {
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newConcurrentHashSet();
@@ -62,8 +63,7 @@
}
private void handleEvent(Change<Leadership> change) {
- SharedExecutors.getSingleThreadExecutor().execute(() ->
- leadershipChangeListeners.forEach(l -> l.accept(change)));
+ leadershipChangeListeners.forEach(l -> l.accept(change));
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index e8abfac..9b58226 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -24,6 +24,7 @@
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
+import io.atomix.resource.ResourceType;
import java.util.Arrays;
import java.util.HashMap;
@@ -72,6 +73,10 @@
ElectionState.class,
Registration.class);
+ public AtomixLeaderElectorState() {
+ super(new ResourceType(AtomixLeaderElector.class));
+ }
+
@Override
protected void configure(StateMachineExecutor executor) {
// Notification
@@ -261,7 +266,7 @@
}
private void onSessionEnd(Session session) {
- Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session);
+ Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
if (listener != null) {
listener.close();
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index d655d52..a5c197b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -110,7 +110,6 @@
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.DISK)
.withDirectory(TEST_DIR + "/" + address.port())
- .withSerializer(serializer.clone())
.build())
.withStateMachine(() -> new ResourceManagerState(resourceRegistry))
.withSerializer(serializer.clone())
diff --git a/pom.xml b/pom.xml
index 0042de6..4280443 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@
<onos-build-conf.version>1.2-SNAPSHOT</onos-build-conf.version>
<netty4.version>4.0.33.Final</netty4.version>
<!-- TODO: replace with final release version when it is out -->
- <atomix.version>0.1.0-beta5</atomix.version>
+ <atomix.version>1.0.0-rc1</atomix.version>
<copycat.version>0.5.1.onos</copycat.version>
<openflowj.version>0.9.1.onos</openflowj.version>
<onos-maven-plugin.version>1.8-SNAPSHOT</onos-maven-plugin.version>