blob: d97868cb1ad7e7e3000956530487798de1e2c7df [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani15b8ef52016-02-02 17:35:05 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Jordan Halterman046faeb2017-05-01 15:10:13 -070018import java.util.Collection;
19import java.util.Set;
20import java.util.concurrent.CompletableFuture;
21import java.util.concurrent.Executor;
22import java.util.function.Consumer;
23import java.util.function.Function;
24import java.util.function.Supplier;
25
Jordan Haltermanc955df72017-02-04 20:43:28 -080026import com.google.common.base.Suppliers;
Madan Jampani15b8ef52016-02-02 17:35:05 -080027import io.atomix.AtomixClient;
28import io.atomix.catalyst.transport.Transport;
Madan Jampani471a7bc2016-04-27 15:51:38 -070029import io.atomix.copycat.client.ConnectionStrategies;
30import io.atomix.copycat.client.CopycatClient;
Madan Jampani1d3b6172016-04-28 13:22:57 -070031import io.atomix.copycat.client.CopycatClient.State;
Madan Jampani471a7bc2016-04-27 15:51:38 -070032import io.atomix.copycat.client.RecoveryStrategies;
Madan Jampani471a7bc2016-04-27 15:51:38 -070033import io.atomix.copycat.client.ServerSelectionStrategies;
34import io.atomix.manager.ResourceClient;
Madan Jampani630e7ac2016-05-31 11:34:05 -070035import io.atomix.manager.ResourceManagerException;
Madan Jampani471a7bc2016-04-27 15:51:38 -070036import io.atomix.manager.util.ResourceManagerTypeResolver;
Madan Jampani630e7ac2016-05-31 11:34:05 -070037import io.atomix.resource.ResourceRegistry;
Madan Jampani471a7bc2016-04-27 15:51:38 -070038import io.atomix.resource.ResourceType;
Madan Jampani15b8ef52016-02-02 17:35:05 -080039import io.atomix.variables.DistributedLong;
Madan Jampani15b8ef52016-02-02 17:35:05 -080040import org.onlab.util.HexString;
Jordan Halterman046faeb2017-05-01 15:10:13 -070041import org.onlab.util.OrderedExecutor;
Madan Jampani15b8ef52016-02-02 17:35:05 -080042import org.onosproject.store.primitives.DistributedPrimitiveCreator;
Jordan Haltermanc955df72017-02-04 20:43:28 -080043import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMap;
Madan Jampani15b8ef52016-02-02 17:35:05 -080044import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
Aaron Kruglikov61582a02016-09-06 13:18:58 -070045import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimap;
Aaron Kruglikoved88ff62016-08-01 16:02:09 -070046import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMap;
Madan Jampani15b8ef52016-02-02 17:35:05 -080047import org.onosproject.store.primitives.resources.impl.AtomixCounter;
Madan Jampani2914e4e2016-09-13 17:48:56 -070048import org.onosproject.store.primitives.resources.impl.AtomixDocumentTree;
Jordan Halterman5a1053e2017-05-19 18:03:47 -070049import org.onosproject.store.primitives.resources.impl.AtomixIdGenerator;
Madan Jampani39fff102016-02-14 13:17:28 -080050import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
Madan Jampani35708a92016-07-06 10:48:19 -070051import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
Madan Jampani15b8ef52016-02-02 17:35:05 -080052import org.onosproject.store.serializers.KryoNamespaces;
53import org.onosproject.store.service.AsyncAtomicCounter;
Jordan Haltermanc955df72017-02-04 20:43:28 -080054import org.onosproject.store.service.AsyncAtomicCounterMap;
Jordan Halterman5a1053e2017-05-19 18:03:47 -070055import org.onosproject.store.service.AsyncAtomicIdGenerator;
Madan Jampani15b8ef52016-02-02 17:35:05 -080056import org.onosproject.store.service.AsyncAtomicValue;
57import org.onosproject.store.service.AsyncConsistentMap;
Aaron Kruglikov61582a02016-09-06 13:18:58 -070058import org.onosproject.store.service.AsyncConsistentMultimap;
Aaron Kruglikoved88ff62016-08-01 16:02:09 -070059import org.onosproject.store.service.AsyncConsistentTreeMap;
Madan Jampani15b8ef52016-02-02 17:35:05 -080060import org.onosproject.store.service.AsyncDistributedSet;
Madan Jampani2914e4e2016-09-13 17:48:56 -070061import org.onosproject.store.service.AsyncDocumentTree;
Madan Jampani15b8ef52016-02-02 17:35:05 -080062import org.onosproject.store.service.AsyncLeaderElector;
Madan Jampani1d3b6172016-04-28 13:22:57 -070063import org.onosproject.store.service.DistributedPrimitive.Status;
Madan Jampaniccdf9da2016-05-05 14:37:27 -070064import org.onosproject.store.service.PartitionClientInfo;
Madan Jampani15b8ef52016-02-02 17:35:05 -080065import org.onosproject.store.service.Serializer;
Madan Jampani819d61d2016-07-25 20:29:43 -070066import org.onosproject.store.service.WorkQueue;
Madan Jampani2f9cc712016-02-15 19:36:21 -080067import org.slf4j.Logger;
Madan Jampani15b8ef52016-02-02 17:35:05 -080068
Jordan Haltermanc955df72017-02-04 20:43:28 -080069import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani15b8ef52016-02-02 17:35:05 -080070
71/**
72 * StoragePartition client.
73 */
74public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed<StoragePartitionClient> {
75
Madan Jampani2f9cc712016-02-15 19:36:21 -080076 private final Logger log = getLogger(getClass());
77
Madan Jampani15b8ef52016-02-02 17:35:05 -080078 private final StoragePartition partition;
79 private final Transport transport;
80 private final io.atomix.catalyst.serializer.Serializer serializer;
Jordan Halterman9bdc24f2017-04-19 23:45:12 -070081 private final Executor sharedExecutor;
Madan Jampani630e7ac2016-05-31 11:34:05 -070082 private AtomixClient client;
83 private ResourceClient resourceClient;
Madan Jampani15b8ef52016-02-02 17:35:05 -080084 private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
Jordan Halterman9bdc24f2017-04-19 23:45:12 -070085 private final com.google.common.base.Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
Madan Jampani15b8ef52016-02-02 17:35:05 -080086 Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
87 Serializer.using(KryoNamespaces.BASIC)));
Madan Jampani1d3b6172016-04-28 13:22:57 -070088 Function<State, Status> mapper = state -> {
89 switch (state) {
90 case CONNECTED:
91 return Status.ACTIVE;
92 case SUSPENDED:
93 return Status.SUSPENDED;
94 case CLOSED:
95 return Status.INACTIVE;
96 default:
97 throw new IllegalStateException("Unknown state " + state);
98 }
99 };
Madan Jampani15b8ef52016-02-02 17:35:05 -0800100
101 public StoragePartitionClient(StoragePartition partition,
102 io.atomix.catalyst.serializer.Serializer serializer,
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700103 Transport transport,
104 Executor sharedExecutor) {
Madan Jampani15b8ef52016-02-02 17:35:05 -0800105 this.partition = partition;
106 this.serializer = serializer;
107 this.transport = transport;
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700108 this.sharedExecutor = sharedExecutor;
Madan Jampani15b8ef52016-02-02 17:35:05 -0800109 }
110
111 @Override
112 public CompletableFuture<Void> open() {
Madan Jampani15b8ef52016-02-02 17:35:05 -0800113 synchronized (StoragePartitionClient.this) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700114 resourceClient = newResourceClient(transport,
Madan Jampani471a7bc2016-04-27 15:51:38 -0700115 serializer.clone(),
116 StoragePartition.RESOURCE_TYPES);
Madan Jampani630e7ac2016-05-31 11:34:05 -0700117 resourceClient.client().onStateChange(state -> log.debug("Partition {} client state"
Madan Jampani1d3b6172016-04-28 13:22:57 -0700118 + " changed to {}", partition.getId(), state));
Madan Jampani630e7ac2016-05-31 11:34:05 -0700119 client = new AtomixClient(resourceClient);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800120 }
Madan Jampani630e7ac2016-05-31 11:34:05 -0700121 return client.connect(partition.getMemberAddresses()).whenComplete((r, e) -> {
Madan Jampani2f9cc712016-02-15 19:36:21 -0800122 if (e == null) {
123 log.info("Successfully started client for partition {}", partition.getId());
124 } else {
125 log.info("Failed to start client for partition {}", partition.getId(), e);
126 }
127 }).thenApply(v -> null);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800128 }
129
130 @Override
131 public CompletableFuture<Void> close() {
132 return client != null ? client.close() : CompletableFuture.completedFuture(null);
133 }
134
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700135 /**
136 * Returns the executor provided by the given supplier or a serial executor if the supplier is {@code null}.
137 *
138 * @param executorSupplier the user-provided executor supplier
139 * @return the executor
140 */
141 private Executor defaultExecutor(Supplier<Executor> executorSupplier) {
Jordan Halterman046faeb2017-05-01 15:10:13 -0700142 return executorSupplier != null ? executorSupplier.get() : new OrderedExecutor(sharedExecutor);
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700143 }
144
Madan Jampani15b8ef52016-02-02 17:35:05 -0800145 @Override
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700146 public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(
147 String name, Serializer serializer, Supplier<Executor> executorSupplier) {
Madan Jampani1d3b6172016-04-28 13:22:57 -0700148 AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
149 Consumer<State> statusListener = state -> {
150 atomixConsistentMap.statusChangeListeners()
151 .forEach(listener -> listener.accept(mapper.apply(state)));
152 };
Madan Jampani630e7ac2016-05-31 11:34:05 -0700153 resourceClient.client().onStateChange(statusListener);
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700154
Madan Jampani15b8ef52016-02-02 17:35:05 -0800155 AsyncConsistentMap<String, byte[]> rawMap =
Madan Jampani1d3b6172016-04-28 13:22:57 -0700156 new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
Madan Jampani15b8ef52016-02-02 17:35:05 -0800157 @Override
158 public String name() {
159 return name;
160 }
161 };
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700162
163 // We have to ensure serialization is done on the Copycat threads since Kryo is not thread safe.
164 AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.newTranscodingMap(rawMap,
Jordan Haltermanc955df72017-02-04 20:43:28 -0800165 key -> HexString.toHexString(serializer.encode(key)),
166 string -> serializer.decode(HexString.fromHexString(string)),
167 value -> value == null ? null : serializer.encode(value),
168 bytes -> serializer.decode(bytes));
Madan Jampani15b8ef52016-02-02 17:35:05 -0800169
Jordan Halterman046faeb2017-05-01 15:10:13 -0700170 return new ExecutingAsyncConsistentMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800171 }
172
173 @Override
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700174 public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
175 String name, Serializer serializer, Supplier<Executor> executorSupplier) {
Aaron Kruglikoved88ff62016-08-01 16:02:09 -0700176 AtomixConsistentTreeMap atomixConsistentTreeMap =
177 client.getResource(name, AtomixConsistentTreeMap.class).join();
178 Consumer<State> statusListener = state -> {
179 atomixConsistentTreeMap.statusChangeListeners()
180 .forEach(listener -> listener.accept(mapper.apply(state)));
181 };
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700182 resourceClient.client().onStateChange(statusListener);
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700183
Aaron Kruglikoved88ff62016-08-01 16:02:09 -0700184 AsyncConsistentTreeMap<byte[]> rawMap =
185 new DelegatingAsyncConsistentTreeMap<byte[]>(atomixConsistentTreeMap) {
186 @Override
187 public String name() {
vijayin66339982017-01-19 15:04:27 +0530188 return name;
Aaron Kruglikoved88ff62016-08-01 16:02:09 -0700189 }
190 };
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700191
Aaron Kruglikoved88ff62016-08-01 16:02:09 -0700192 AsyncConsistentTreeMap<V> transcodedMap =
193 DistributedPrimitives.<V, byte[]>newTranscodingTreeMap(
Jordan Haltermanc955df72017-02-04 20:43:28 -0800194 rawMap,
195 value -> value == null ? null : serializer.encode(value),
196 bytes -> serializer.decode(bytes));
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700197
Jordan Halterman046faeb2017-05-01 15:10:13 -0700198 return new ExecutingAsyncConsistentTreeMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
Aaron Kruglikoved88ff62016-08-01 16:02:09 -0700199 }
200
201 @Override
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700202 public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700203 String name, Serializer serializer, Supplier<Executor> executorSupplier) {
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700204 AtomixConsistentSetMultimap atomixConsistentSetMultimap =
205 client.getResource(name, AtomixConsistentSetMultimap.class)
206 .join();
207 Consumer<State> statusListener = state -> {
208 atomixConsistentSetMultimap.statusChangeListeners()
209 .forEach(listener -> listener.accept(mapper.apply(state)));
210 };
211 resourceClient.client().onStateChange(statusListener);
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700212
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700213 AsyncConsistentMultimap<String, byte[]> rawMap =
214 new DelegatingAsyncConsistentMultimap<String, byte[]>(
215 atomixConsistentSetMultimap) {
216 @Override
217 public String name() {
218 return super.name();
219 }
220 };
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700221
Jonathan Hartad0c3022017-02-22 14:06:01 -0800222 AsyncConsistentMultimap<K, V> transcodedMap =
223 DistributedPrimitives.newTranscodingMultimap(
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700224 rawMap,
225 key -> HexString.toHexString(serializer.encode(key)),
Jonathan Hartad0c3022017-02-22 14:06:01 -0800226 string -> serializer.decode(HexString.fromHexString(string)),
227 value -> serializer.encode(value),
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700228 bytes -> serializer.decode(bytes));
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700229
Jordan Halterman046faeb2017-05-01 15:10:13 -0700230 return new ExecutingAsyncConsistentMultimap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700231 }
232
233 @Override
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700234 public <E> AsyncDistributedSet<E> newAsyncDistributedSet(
235 String name, Serializer serializer, Supplier<Executor> executorSupplier) {
236 return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer, executorSupplier));
Madan Jampani15b8ef52016-02-02 17:35:05 -0800237 }
238
239 @Override
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700240 public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
241 String name, Serializer serializer, Supplier<Executor> executorSupplier) {
Jordan Haltermanc955df72017-02-04 20:43:28 -0800242 AtomixAtomicCounterMap atomixAtomicCounterMap =
243 client.getResource(name, AtomixAtomicCounterMap.class)
244 .join();
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700245
Jordan Haltermanc955df72017-02-04 20:43:28 -0800246 AsyncAtomicCounterMap<K> transcodedMap =
247 DistributedPrimitives.<K, String>newTranscodingAtomicCounterMap(
248 atomixAtomicCounterMap,
249 key -> HexString.toHexString(serializer.encode(key)),
250 string -> serializer.decode(HexString.fromHexString(string)));
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700251
Jordan Halterman046faeb2017-05-01 15:10:13 -0700252 return new ExecutingAsyncAtomicCounterMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
Jordan Haltermanc955df72017-02-04 20:43:28 -0800253 }
254
255 @Override
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700256 public AsyncAtomicCounter newAsyncCounter(String name, Supplier<Executor> executorSupplier) {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700257 DistributedLong distributedLong = client.getLong(name).join();
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700258 AsyncAtomicCounter asyncCounter = new AtomixCounter(name, distributedLong);
Jordan Halterman046faeb2017-05-01 15:10:13 -0700259 return new ExecutingAsyncAtomicCounter(asyncCounter, defaultExecutor(executorSupplier), sharedExecutor);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800260 }
261
262 @Override
Jordan Halterman5a1053e2017-05-19 18:03:47 -0700263 public AsyncAtomicIdGenerator newAsyncIdGenerator(String name, Supplier<Executor> executorSupplier) {
264 DistributedLong distributedLong = client.getLong(name).join();
265 AsyncAtomicIdGenerator asyncIdGenerator = new AtomixIdGenerator(name, distributedLong);
266 return new ExecutingAsyncAtomicIdGenerator(asyncIdGenerator, defaultExecutor(executorSupplier), sharedExecutor);
267 }
268
269 @Override
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700270 public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
271 String name, Serializer serializer, Supplier<Executor> executorSupplier) {
272 AsyncAtomicValue<V> asyncValue = new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
Jordan Halterman046faeb2017-05-01 15:10:13 -0700273 return new ExecutingAsyncAtomicValue<>(asyncValue, defaultExecutor(executorSupplier), sharedExecutor);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800274 }
275
276 @Override
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700277 public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer, Supplier<Executor> executorSupplier) {
278 AtomixWorkQueue atomixWorkQueue = client.getResource(name, AtomixWorkQueue.class).join();
279 WorkQueue<E> workQueue = new DefaultDistributedWorkQueue<>(atomixWorkQueue, serializer);
Jordan Halterman046faeb2017-05-01 15:10:13 -0700280 return new ExecutingWorkQueue<>(workQueue, defaultExecutor(executorSupplier), sharedExecutor);
Madan Jampani35708a92016-07-06 10:48:19 -0700281 }
282
283 @Override
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700284 public <V> AsyncDocumentTree<V> newAsyncDocumentTree(
285 String name, Serializer serializer, Supplier<Executor> executorSupplier) {
Madan Jampani2914e4e2016-09-13 17:48:56 -0700286 AtomixDocumentTree atomixDocumentTree = client.getResource(name, AtomixDocumentTree.class).join();
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700287 AsyncDocumentTree<V> asyncDocumentTree = new DefaultDistributedDocumentTree<>(
288 name, atomixDocumentTree, serializer);
Jordan Halterman046faeb2017-05-01 15:10:13 -0700289 return new ExecutingAsyncDocumentTree<>(asyncDocumentTree, defaultExecutor(executorSupplier), sharedExecutor);
Madan Jampani2914e4e2016-09-13 17:48:56 -0700290 }
291
292 @Override
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700293 public AsyncLeaderElector newAsyncLeaderElector(String name, Supplier<Executor> executorSupplier) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700294 AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
295 .thenCompose(AtomixLeaderElector::setupCache)
296 .join();
Jordan Halterman046faeb2017-05-01 15:10:13 -0700297 Consumer<State> statusListener = state -> leaderElector.statusChangeListeners()
298 .forEach(listener -> listener.accept(mapper.apply(state)));
Madan Jampani630e7ac2016-05-31 11:34:05 -0700299 resourceClient.client().onStateChange(statusListener);
Jordan Halterman046faeb2017-05-01 15:10:13 -0700300 return new ExecutingAsyncLeaderElector(leaderElector, defaultExecutor(executorSupplier), sharedExecutor);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800301 }
302
303 @Override
Madan Jampanie14a09c2016-02-11 10:43:21 -0800304 public Set<String> getAsyncConsistentMapNames() {
305 return client.keys(AtomixConsistentMap.class).join();
306 }
307
308 @Override
309 public Set<String> getAsyncAtomicCounterNames() {
310 return client.keys(DistributedLong.class).join();
311 }
312
313 @Override
Madan Jampani35708a92016-07-06 10:48:19 -0700314 public Set<String> getWorkQueueNames() {
315 return client.keys(AtomixWorkQueue.class).join();
316 }
317
318 @Override
Madan Jampani15b8ef52016-02-02 17:35:05 -0800319 public boolean isOpen() {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700320 return resourceClient.client().state() != State.CLOSED;
Madan Jampani15b8ef52016-02-02 17:35:05 -0800321 }
Madan Jampani471a7bc2016-04-27 15:51:38 -0700322
Madan Jampaniccdf9da2016-05-05 14:37:27 -0700323 /**
324 * Returns the {@link PartitionClientInfo information} for this client.
325 * @return partition client information
326 */
327 public PartitionClientInfo clientInfo() {
328 return new PartitionClientInfo(partition.getId(),
329 partition.getMembers(),
Madan Jampani630e7ac2016-05-31 11:34:05 -0700330 resourceClient.client().session().id(),
331 mapper.apply(resourceClient.client().state()));
Madan Jampaniccdf9da2016-05-05 14:37:27 -0700332 }
333
Madan Jampani630e7ac2016-05-31 11:34:05 -0700334 private ResourceClient newResourceClient(Transport transport,
Madan Jampani471a7bc2016-04-27 15:51:38 -0700335 io.atomix.catalyst.serializer.Serializer serializer,
336 Collection<ResourceType> resourceTypes) {
337 ResourceRegistry registry = new ResourceRegistry();
338 resourceTypes.forEach(registry::register);
Madan Jampani630e7ac2016-05-31 11:34:05 -0700339 CopycatClient copycatClient = CopycatClient.builder()
Madan Jampani471a7bc2016-04-27 15:51:38 -0700340 .withServerSelectionStrategy(ServerSelectionStrategies.ANY)
341 .withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
342 .withRecoveryStrategy(RecoveryStrategies.RECOVER)
Madan Jampani471a7bc2016-04-27 15:51:38 -0700343 .withTransport(transport)
344 .withSerializer(serializer)
Madan Jampani471a7bc2016-04-27 15:51:38 -0700345 .build();
Madan Jampani630e7ac2016-05-31 11:34:05 -0700346 copycatClient.serializer().resolve(new ResourceManagerTypeResolver());
Madan Jampani471a7bc2016-04-27 15:51:38 -0700347 for (ResourceType type : registry.types()) {
348 try {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700349 type.factory()
350 .newInstance()
351 .createSerializableTypeResolver()
352 .resolve(copycatClient.serializer().registry());
Madan Jampani471a7bc2016-04-27 15:51:38 -0700353 } catch (InstantiationException | IllegalAccessException e) {
354 throw new ResourceManagerException(e);
355 }
356 }
Madan Jampani21a71492016-06-16 16:53:04 -0700357 return new ResourceClient(new OnosCopycatClient(copycatClient, 5, 100));
Madan Jampanid5b200f2016-06-06 17:15:25 -0700358 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800359}