Migrating to latest Atomix
Change-Id: Ie636d1b2623b7f83572dca0d70bd56734379e61a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 3cde4bd..175e253 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -19,9 +19,8 @@
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
-import io.atomix.copycat.Query;
import io.atomix.manager.util.ResourceManagerTypeResolver;
-import io.atomix.variables.state.LongCommands;
+import io.atomix.variables.internal.LongCommands;
import org.onlab.util.Match;
import org.onosproject.cluster.Leader;
@@ -63,8 +62,7 @@
Transaction.State.class,
PrepareResult.class,
CommitResult.class,
- RollbackResult.class,
- Query.ConsistencyLevel.class));
+ RollbackResult.class));
// ONOS classes
serializer.register(Change.class, factory);
serializer.register(Leader.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
index 66ae253..7a1cecf 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -29,7 +29,7 @@
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.util.concurrent.ThreadContext;
+import io.atomix.catalyst.concurrent.ThreadContext;
/**
* {@link Client} implementation for {@link CopycatTransport}.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index 333ec3d..8d4b577 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -15,6 +15,18 @@
*/
package org.onosproject.store.primitives.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
+import io.atomix.catalyst.concurrent.Listener;
+import io.atomix.catalyst.concurrent.Listeners;
+import io.atomix.catalyst.concurrent.ThreadContext;
+import io.atomix.catalyst.serializer.SerializationException;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Connection;
+import io.atomix.catalyst.transport.MessageHandler;
+import io.atomix.catalyst.transport.TransportException;
+import io.atomix.catalyst.util.Assert;
+import io.atomix.catalyst.util.reference.ReferenceCounted;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -26,8 +38,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
-
-
import org.apache.commons.io.IOUtils;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
@@ -38,18 +48,6 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
-import static com.google.common.base.Preconditions.checkNotNull;
-import io.atomix.catalyst.serializer.SerializationException;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.transport.MessageHandler;
-import io.atomix.catalyst.transport.TransportException;
-import io.atomix.catalyst.util.Assert;
-import io.atomix.catalyst.util.Listener;
-import io.atomix.catalyst.util.Listeners;
-import io.atomix.catalyst.util.ReferenceCounted;
-import io.atomix.catalyst.util.concurrent.ThreadContext;
-
/**
* {@link Connection} implementation for CopycatTransport.
*/
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index 345a0d7..ff64112 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -17,6 +17,12 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.catalyst.concurrent.CatalystThreadFactory;
+import io.atomix.catalyst.concurrent.SingleThreadContext;
+import io.atomix.catalyst.concurrent.ThreadContext;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Connection;
+import io.atomix.catalyst.transport.Server;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
@@ -36,13 +42,6 @@
import com.google.common.collect.Maps;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.transport.Server;
-import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
-import io.atomix.catalyst.util.concurrent.SingleThreadContext;
-import io.atomix.catalyst.util.concurrent.ThreadContext;
-
/**
* {@link Server} implementation for {@link CopycatTransport}.
*/
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java
new file mode 100644
index 0000000..8955a6d
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import io.atomix.catalyst.concurrent.Listener;
+import io.atomix.catalyst.concurrent.ThreadContext;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Transport;
+import io.atomix.copycat.Command;
+import io.atomix.copycat.Query;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.session.Session;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * {@code CopycatClient} that merely delegates control to
+ * another CopycatClient.
+ */
+public class DelegatingCopycatClient implements CopycatClient {
+
+ protected final CopycatClient client;
+
+ DelegatingCopycatClient(CopycatClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public State state() {
+ return client.state();
+ }
+
+ @Override
+ public Listener<State> onStateChange(Consumer<State> callback) {
+ return client.onStateChange(callback);
+ }
+
+ @Override
+ public ThreadContext context() {
+ return client.context();
+ }
+
+ @Override
+ public Transport transport() {
+ return client.transport();
+ }
+
+ @Override
+ public Serializer serializer() {
+ return client.serializer();
+ }
+
+ @Override
+ public Session session() {
+ return client.session();
+ }
+
+ @Override
+ public <T> CompletableFuture<T> submit(Command<T> command) {
+ return client.submit(command);
+ }
+
+ @Override
+ public <T> CompletableFuture<T> submit(Query<T> query) {
+ return client.submit(query);
+ }
+
+ @Override
+ public Listener<Void> onEvent(String event, Runnable callback) {
+ return client.onEvent(event, callback);
+ }
+
+ @Override
+ public <T> Listener<T> onEvent(String event, Consumer<T> callback) {
+ return client.onEvent(event, callback);
+ }
+
+ @Override
+ public CompletableFuture<CopycatClient> connect(Collection<Address> members) {
+ return client.connect(members);
+ }
+
+ @Override
+ public CompletableFuture<CopycatClient> recover() {
+ return client.recover();
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ return client.close();
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/QueryRetryingCopycatClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/QueryRetryingCopycatClient.java
new file mode 100644
index 0000000..39ed6c3
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/QueryRetryingCopycatClient.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.net.ConnectException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Throwables;
+
+import io.atomix.catalyst.transport.TransportException;
+import io.atomix.copycat.Query;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.error.QueryException;
+import io.atomix.copycat.error.UnknownSessionException;
+import io.atomix.copycat.session.ClosedSessionException;
+
+/**
+ * {@code CopycatClient} that can retry when certain recoverable errors are encoutered.
+ */
+public class QueryRetryingCopycatClient extends DelegatingCopycatClient {
+
+ private final int maxRetries;
+ private final long delayBetweenRetriesMillis;
+ private final ScheduledExecutorService executor;
+ private final Logger log = getLogger(getClass());
+
+ private final Predicate<Throwable> retryableCheck = e -> e instanceof ConnectException
+ || e instanceof TimeoutException
+ || e instanceof TransportException
+ || e instanceof ClosedChannelException
+ || e instanceof QueryException
+ || e instanceof UnknownSessionException
+ || e instanceof ClosedSessionException;
+
+ QueryRetryingCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
+ super(client);
+ this.maxRetries = maxRetries;
+ this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
+ this.executor = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ executor.shutdown();
+ return super.close();
+ }
+
+ @Override
+ public <T> CompletableFuture<T> submit(Query<T> query) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ executor.submit(() -> submit(query, 1, future));
+ return future;
+ }
+
+ private <T> void submit(Query<T> query, int attemptIndex, CompletableFuture<T> future) {
+ client.submit(query).whenComplete((r, e) -> {
+ if (e != null) {
+ if (attemptIndex < maxRetries + 1 && retryableCheck.test(Throwables.getRootCause(e))) {
+ log.debug("Retry attempt ({} of {}). Failure due to {}",
+ attemptIndex, maxRetries, Throwables.getRootCause(e).getClass());
+ executor.schedule(() ->
+ submit(query, attemptIndex + 1, future), delayBetweenRetriesMillis, TimeUnit.MILLISECONDS);
+ } else {
+ future.completeExceptionally(e);
+ }
+ } else {
+ future.complete(r);
+ }
+ });
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index e1086cf..63037d0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -16,22 +16,18 @@
package org.onosproject.store.primitives.impl;
import static org.slf4j.LoggerFactory.getLogger;
-import io.atomix.Atomix;
import io.atomix.AtomixClient;
-import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Transport;
-import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
import io.atomix.copycat.client.ConnectionStrategies;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.client.CopycatClient.State;
import io.atomix.copycat.client.RecoveryStrategies;
-import io.atomix.copycat.client.RetryStrategies;
import io.atomix.copycat.client.ServerSelectionStrategies;
import io.atomix.manager.ResourceClient;
-import io.atomix.manager.state.ResourceManagerException;
+import io.atomix.manager.ResourceManagerException;
import io.atomix.manager.util.ResourceManagerTypeResolver;
+import io.atomix.resource.ResourceRegistry;
import io.atomix.resource.ResourceType;
-import io.atomix.resource.util.ResourceRegistry;
import io.atomix.variables.DistributedLong;
import java.util.Collection;
@@ -70,8 +66,8 @@
private final StoragePartition partition;
private final Transport transport;
private final io.atomix.catalyst.serializer.Serializer serializer;
- private Atomix client;
- private CopycatClient copycatClient;
+ private AtomixClient client;
+ private ResourceClient resourceClient;
private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
@@ -99,19 +95,15 @@
@Override
public CompletableFuture<Void> open() {
- if (client != null && client.isOpen()) {
- return CompletableFuture.completedFuture(null);
- }
synchronized (StoragePartitionClient.this) {
- copycatClient = newCopycatClient(partition.getMemberAddresses(),
- transport,
+ resourceClient = newResourceClient(transport,
serializer.clone(),
StoragePartition.RESOURCE_TYPES);
- copycatClient.onStateChange(state -> log.debug("Partition {} client state"
+ resourceClient.client().onStateChange(state -> log.debug("Partition {} client state"
+ " changed to {}", partition.getId(), state));
- client = new AtomixClient(new ResourceClient(copycatClient));
+ client = new AtomixClient(resourceClient);
}
- return client.open().whenComplete((r, e) -> {
+ return client.connect(partition.getMemberAddresses()).whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully started client for partition {}", partition.getId());
} else {
@@ -132,7 +124,7 @@
atomixConsistentMap.statusChangeListeners()
.forEach(listener -> listener.accept(mapper.apply(state)));
};
- copycatClient.onStateChange(statusListener);
+ resourceClient.client().onStateChange(statusListener);
AsyncConsistentMap<String, byte[]> rawMap =
new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
@Override
@@ -173,7 +165,15 @@
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
- return client.getResource(name, AtomixLeaderElector.class).join();
+ AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
+ .thenCompose(AtomixLeaderElector::setupCache)
+ .join();
+ Consumer<State> statusListener = state -> {
+ leaderElector.statusChangeListeners()
+ .forEach(listener -> listener.accept(mapper.apply(state)));
+ };
+ resourceClient.client().onStateChange(statusListener);
+ return leaderElector;
}
@Override
@@ -188,7 +188,7 @@
@Override
public boolean isOpen() {
- return client.isOpen();
+ return resourceClient.client().state() != State.CLOSED;
}
/**
@@ -198,33 +198,33 @@
public PartitionClientInfo clientInfo() {
return new PartitionClientInfo(partition.getId(),
partition.getMembers(),
- copycatClient.session().id(),
- mapper.apply(copycatClient.state()));
+ resourceClient.client().session().id(),
+ mapper.apply(resourceClient.client().state()));
}
- private CopycatClient newCopycatClient(Collection<Address> members,
- Transport transport,
+ private ResourceClient newResourceClient(Transport transport,
io.atomix.catalyst.serializer.Serializer serializer,
Collection<ResourceType> resourceTypes) {
ResourceRegistry registry = new ResourceRegistry();
resourceTypes.forEach(registry::register);
- CopycatClient client = CopycatClient.builder(members)
+ CopycatClient copycatClient = CopycatClient.builder()
.withServerSelectionStrategy(ServerSelectionStrategies.ANY)
.withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
.withRecoveryStrategy(RecoveryStrategies.RECOVER)
- .withRetryStrategy(RetryStrategies.FIBONACCI_BACKOFF)
.withTransport(transport)
.withSerializer(serializer)
- .withThreadFactory(new CatalystThreadFactory(String.format("copycat-client-%s", partition.getId())))
.build();
- client.serializer().resolve(new ResourceManagerTypeResolver());
+ copycatClient.serializer().resolve(new ResourceManagerTypeResolver());
for (ResourceType type : registry.types()) {
try {
- type.factory().newInstance().createSerializableTypeResolver().resolve(client.serializer().registry());
+ type.factory()
+ .newInstance()
+ .createSerializableTypeResolver()
+ .resolve(copycatClient.serializer().registry());
} catch (InstantiationException | IllegalAccessException e) {
throw new ResourceManagerException(e);
}
}
- return client;
+ return new ResourceClient(new QueryRetryingCopycatClient(copycatClient, 2, 100));
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 94ede8e..f0c0609 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -22,7 +22,7 @@
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
-import io.atomix.manager.state.ResourceManagerState;
+import io.atomix.manager.internal.ResourceManagerState;
import io.atomix.manager.util.ResourceManagerTypeResolver;
import java.io.File;
@@ -68,9 +68,9 @@
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
- server = buildServer(partition.getMemberAddresses());
+ server = buildServer();
}
- serverOpenFuture = server.start();
+ serverOpenFuture = server.bootstrap(partition.getMemberAddresses());
} else {
serverOpenFuture = CompletableFuture.completedFuture(null);
}
@@ -85,11 +85,7 @@
@Override
public CompletableFuture<Void> close() {
- /**
- * CopycatServer#kill just shuts down the server and does not result
- * in any cluster membership changes.
- */
- return server.kill();
+ return server.shutdown();
}
/**
@@ -97,11 +93,11 @@
* @return future that is completed when the operation is complete
*/
public CompletableFuture<Void> closeAndExit() {
- return server.stop();
+ return server.leave();
}
- private CopycatServer buildServer(Collection<Address> clusterMembers) {
- CopycatServer server = CopycatServer.builder(localAddress, clusterMembers)
+ private CopycatServer buildServer() {
+ CopycatServer server = CopycatServer.builder(localAddress)
.withName("partition-" + partition.getId())
.withSerializer(serializer.clone())
.withTransport(transport.get())
@@ -118,9 +114,8 @@
}
public CompletableFuture<Void> join(Collection<Address> otherMembers) {
- server = buildServer(otherMembers);
-
- return server.start().whenComplete((r, e) -> {
+ server = buildServer();
+ return server.join(otherMembers).whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully joined partition {}", partition.getId());
} else {