Support for reacting to underlying copycat client session state changes

Change-Id: If8af43f81963653da3584167d7a9813456ce3773
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index d0bc2f7..230060a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -18,9 +18,22 @@
 import static org.slf4j.LoggerFactory.getLogger;
 import io.atomix.Atomix;
 import io.atomix.AtomixClient;
+import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Transport;
+import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
+import io.atomix.copycat.client.ConnectionStrategies;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.client.RecoveryStrategies;
+import io.atomix.copycat.client.RetryStrategies;
+import io.atomix.copycat.client.ServerSelectionStrategies;
+import io.atomix.manager.ResourceClient;
+import io.atomix.manager.state.ResourceManagerException;
+import io.atomix.manager.util.ResourceManagerTypeResolver;
+import io.atomix.resource.ResourceType;
+import io.atomix.resource.util.ResourceRegistry;
 import io.atomix.variables.DistributedLong;
 
+import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
@@ -53,6 +66,7 @@
     private final Transport transport;
     private final io.atomix.catalyst.serializer.Serializer serializer;
     private Atomix client;
+    private CopycatClient copycatClient;
     private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
     private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
             Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
@@ -72,11 +86,12 @@
             return CompletableFuture.completedFuture(null);
         }
         synchronized (StoragePartitionClient.this) {
-            client = AtomixClient.builder(partition.getMemberAddresses())
-                                .withResourceTypes(StoragePartition.RESOURCE_TYPES)
-                                .withSerializer(serializer.clone())
-                                .withTransport(transport)
-                                .build();
+            copycatClient = newCopycatClient(partition.getMemberAddresses(),
+                                             transport,
+                                             serializer.clone(),
+                                             StoragePartition.RESOURCE_TYPES);
+            copycatClient.onStateChange(state -> log.info("Client state {}", state));
+            client = new AtomixClient(new ResourceClient(copycatClient));
         }
         return client.open().whenComplete((r, e) -> {
             if (e == null) {
@@ -154,4 +169,30 @@
     public boolean isOpen() {
         return client.isOpen();
     }
+
+    private CopycatClient newCopycatClient(Collection<Address> members,
+                                           Transport transport,
+                                           io.atomix.catalyst.serializer.Serializer serializer,
+                                           Collection<ResourceType> resourceTypes) {
+        ResourceRegistry registry = new ResourceRegistry();
+        resourceTypes.forEach(registry::register);
+        CopycatClient client = CopycatClient.builder(members)
+                .withServerSelectionStrategy(ServerSelectionStrategies.ANY)
+                .withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
+                .withRecoveryStrategy(RecoveryStrategies.RECOVER)
+                .withRetryStrategy(RetryStrategies.FIBONACCI_BACKOFF)
+                .withTransport(transport)
+                .withSerializer(serializer)
+                .withThreadFactory(new CatalystThreadFactory(String.format("copycat-client-%s", partition.getId())))
+                .build();
+        client.serializer().resolve(new ResourceManagerTypeResolver());
+        for (ResourceType type : registry.types()) {
+            try {
+                type.factory().newInstance().createSerializableTypeResolver().resolve(client.serializer().registry());
+            } catch (InstantiationException | IllegalAccessException e) {
+                throw new ResourceManagerException(e);
+            }
+        }
+        return client;
+    }
 }