Support for reacting to underlying copycat client session state changes
Change-Id: If8af43f81963653da3584167d7a9813456ce3773
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index d0bc2f7..230060a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -18,9 +18,22 @@
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
+import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Transport;
+import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
+import io.atomix.copycat.client.ConnectionStrategies;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.client.RecoveryStrategies;
+import io.atomix.copycat.client.RetryStrategies;
+import io.atomix.copycat.client.ServerSelectionStrategies;
+import io.atomix.manager.ResourceClient;
+import io.atomix.manager.state.ResourceManagerException;
+import io.atomix.manager.util.ResourceManagerTypeResolver;
+import io.atomix.resource.ResourceType;
+import io.atomix.resource.util.ResourceRegistry;
import io.atomix.variables.DistributedLong;
+import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -53,6 +66,7 @@
private final Transport transport;
private final io.atomix.catalyst.serializer.Serializer serializer;
private Atomix client;
+ private CopycatClient copycatClient;
private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
@@ -72,11 +86,12 @@
return CompletableFuture.completedFuture(null);
}
synchronized (StoragePartitionClient.this) {
- client = AtomixClient.builder(partition.getMemberAddresses())
- .withResourceTypes(StoragePartition.RESOURCE_TYPES)
- .withSerializer(serializer.clone())
- .withTransport(transport)
- .build();
+ copycatClient = newCopycatClient(partition.getMemberAddresses(),
+ transport,
+ serializer.clone(),
+ StoragePartition.RESOURCE_TYPES);
+ copycatClient.onStateChange(state -> log.info("Client state {}", state));
+ client = new AtomixClient(new ResourceClient(copycatClient));
}
return client.open().whenComplete((r, e) -> {
if (e == null) {
@@ -154,4 +169,30 @@
public boolean isOpen() {
return client.isOpen();
}
+
+ private CopycatClient newCopycatClient(Collection<Address> members,
+ Transport transport,
+ io.atomix.catalyst.serializer.Serializer serializer,
+ Collection<ResourceType> resourceTypes) {
+ ResourceRegistry registry = new ResourceRegistry();
+ resourceTypes.forEach(registry::register);
+ CopycatClient client = CopycatClient.builder(members)
+ .withServerSelectionStrategy(ServerSelectionStrategies.ANY)
+ .withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
+ .withRecoveryStrategy(RecoveryStrategies.RECOVER)
+ .withRetryStrategy(RetryStrategies.FIBONACCI_BACKOFF)
+ .withTransport(transport)
+ .withSerializer(serializer)
+ .withThreadFactory(new CatalystThreadFactory(String.format("copycat-client-%s", partition.getId())))
+ .build();
+ client.serializer().resolve(new ResourceManagerTypeResolver());
+ for (ResourceType type : registry.types()) {
+ try {
+ type.factory().newInstance().createSerializableTypeResolver().resolve(client.serializer().registry());
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new ResourceManagerException(e);
+ }
+ }
+ return client;
+ }
}