blob: 5fa738305023de035e6457b81b48eda2ee5c0a4a [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani15b8ef52016-02-02 17:35:05 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Jordan Halterman2bf177c2017-06-29 01:49:08 -070018import java.time.Duration;
Jordan Halterman046faeb2017-05-01 15:10:13 -070019import java.util.Set;
20import java.util.concurrent.CompletableFuture;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070021import java.util.stream.Collectors;
Jordan Halterman046faeb2017-05-01 15:10:13 -070022
Jordan Haltermanc955df72017-02-04 20:43:28 -080023import com.google.common.base.Suppliers;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070024import io.atomix.protocols.raft.RaftClient;
25import io.atomix.protocols.raft.ReadConsistency;
26import io.atomix.protocols.raft.cluster.MemberId;
27import io.atomix.protocols.raft.protocol.RaftClientProtocol;
28import io.atomix.protocols.raft.proxy.CommunicationStrategy;
Jordan Halterman2c045992018-03-20 21:33:00 -070029import io.atomix.protocols.raft.service.PropagationStrategy;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070030import io.atomix.protocols.raft.session.RaftSessionMetadata;
Madan Jampani15b8ef52016-02-02 17:35:05 -080031import org.onlab.util.HexString;
32import org.onosproject.store.primitives.DistributedPrimitiveCreator;
Jordan Haltermanc955df72017-02-04 20:43:28 -080033import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMap;
Madan Jampani15b8ef52016-02-02 17:35:05 -080034import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
Aaron Kruglikov61582a02016-09-06 13:18:58 -070035import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimap;
Aaron Kruglikoved88ff62016-08-01 16:02:09 -070036import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMap;
Madan Jampani15b8ef52016-02-02 17:35:05 -080037import org.onosproject.store.primitives.resources.impl.AtomixCounter;
Jordan Halterman47432582018-01-25 16:56:45 -080038import org.onosproject.store.primitives.resources.impl.AtomixDistributedLock;
Madan Jampani2914e4e2016-09-13 17:48:56 -070039import org.onosproject.store.primitives.resources.impl.AtomixDocumentTree;
Jordan Halterman5a1053e2017-05-19 18:03:47 -070040import org.onosproject.store.primitives.resources.impl.AtomixIdGenerator;
Madan Jampani39fff102016-02-14 13:17:28 -080041import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
Madan Jampani35708a92016-07-06 10:48:19 -070042import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
Madan Jampani15b8ef52016-02-02 17:35:05 -080043import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.service.AsyncAtomicCounter;
Jordan Haltermanc955df72017-02-04 20:43:28 -080045import org.onosproject.store.service.AsyncAtomicCounterMap;
Jordan Halterman5a1053e2017-05-19 18:03:47 -070046import org.onosproject.store.service.AsyncAtomicIdGenerator;
Madan Jampani15b8ef52016-02-02 17:35:05 -080047import org.onosproject.store.service.AsyncAtomicValue;
48import org.onosproject.store.service.AsyncConsistentMap;
Aaron Kruglikov61582a02016-09-06 13:18:58 -070049import org.onosproject.store.service.AsyncConsistentMultimap;
Aaron Kruglikoved88ff62016-08-01 16:02:09 -070050import org.onosproject.store.service.AsyncConsistentTreeMap;
Jordan Halterman47432582018-01-25 16:56:45 -080051import org.onosproject.store.service.AsyncDistributedLock;
Madan Jampani15b8ef52016-02-02 17:35:05 -080052import org.onosproject.store.service.AsyncDistributedSet;
Madan Jampani2914e4e2016-09-13 17:48:56 -070053import org.onosproject.store.service.AsyncDocumentTree;
Madan Jampani15b8ef52016-02-02 17:35:05 -080054import org.onosproject.store.service.AsyncLeaderElector;
Jordan Halterman2c045992018-03-20 21:33:00 -070055import org.onosproject.store.service.AtomicCounterMapOptions;
56import org.onosproject.store.service.AtomicCounterOptions;
57import org.onosproject.store.service.AtomicIdGeneratorOptions;
58import org.onosproject.store.service.AtomicValueOptions;
59import org.onosproject.store.service.ConsistentMapOptions;
60import org.onosproject.store.service.ConsistentMultimapOptions;
61import org.onosproject.store.service.ConsistentTreeMapOptions;
62import org.onosproject.store.service.DistributedLockOptions;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070063import org.onosproject.store.service.DistributedPrimitive;
Jordan Halterman2c045992018-03-20 21:33:00 -070064import org.onosproject.store.service.DistributedSetOptions;
65import org.onosproject.store.service.DocumentTreeOptions;
66import org.onosproject.store.service.LeaderElectorOptions;
Madan Jampaniccdf9da2016-05-05 14:37:27 -070067import org.onosproject.store.service.PartitionClientInfo;
Madan Jampani15b8ef52016-02-02 17:35:05 -080068import org.onosproject.store.service.Serializer;
Madan Jampani819d61d2016-07-25 20:29:43 -070069import org.onosproject.store.service.WorkQueue;
Jordan Halterman2c045992018-03-20 21:33:00 -070070import org.onosproject.store.service.WorkQueueOptions;
Madan Jampani2f9cc712016-02-15 19:36:21 -080071import org.slf4j.Logger;
Madan Jampani15b8ef52016-02-02 17:35:05 -080072
Jordan Haltermanc955df72017-02-04 20:43:28 -080073import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani15b8ef52016-02-02 17:35:05 -080074
75/**
76 * StoragePartition client.
77 */
78public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed<StoragePartitionClient> {
79
Jordan Halterman21249352018-01-23 12:35:09 -080080 private static final int MAX_RETRIES = 8;
81 private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
82
Jordan Halterman153449c2018-04-05 18:31:26 -070083 private static final String MIN_TIMEOUT_PROPERTY = "onos.cluster.raft.client.minTimeoutMillis";
84 private static final String MAX_TIMEOUT_PROPERTY = "onos.cluster.raft.client.maxTimeoutMillis";
85
86 private static final Duration MIN_TIMEOUT;
87 private static final Duration MAX_TIMEOUT;
88
89 private static final long DEFAULT_MIN_TIMEOUT_MILLIS = 5000;
90 private static final long DEFAULT_MAX_TIMEOUT_MILLIS = 30000;
91
92 static {
93 Duration minTimeout;
94 try {
95 minTimeout = Duration.ofMillis(Long.parseLong(
96 System.getProperty(MIN_TIMEOUT_PROPERTY,
97 String.valueOf(DEFAULT_MIN_TIMEOUT_MILLIS))));
98 } catch (NumberFormatException e) {
99 minTimeout = Duration.ofMillis(DEFAULT_MIN_TIMEOUT_MILLIS);
100 }
101 MIN_TIMEOUT = minTimeout;
102
103 Duration maxTimeout;
104 try {
105 maxTimeout = Duration.ofMillis(Long.parseLong(
106 System.getProperty(MAX_TIMEOUT_PROPERTY,
107 String.valueOf(DEFAULT_MAX_TIMEOUT_MILLIS))));
108 } catch (NumberFormatException e) {
109 maxTimeout = Duration.ofMillis(DEFAULT_MAX_TIMEOUT_MILLIS);
110 }
111 MAX_TIMEOUT = maxTimeout;
112 }
113
Madan Jampani2f9cc712016-02-15 19:36:21 -0800114 private final Logger log = getLogger(getClass());
115
Madan Jampani15b8ef52016-02-02 17:35:05 -0800116 private final StoragePartition partition;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700117 private final MemberId localMemberId;
118 private final RaftClientProtocol protocol;
119 private RaftClient client;
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700120 private final com.google.common.base.Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
Madan Jampani15b8ef52016-02-02 17:35:05 -0800121 Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
122 Serializer.using(KryoNamespaces.BASIC)));
123
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700124 public StoragePartitionClient(StoragePartition partition, MemberId localMemberId, RaftClientProtocol protocol) {
Madan Jampani15b8ef52016-02-02 17:35:05 -0800125 this.partition = partition;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700126 this.localMemberId = localMemberId;
127 this.protocol = protocol;
Madan Jampani15b8ef52016-02-02 17:35:05 -0800128 }
129
130 @Override
131 public CompletableFuture<Void> open() {
Madan Jampani15b8ef52016-02-02 17:35:05 -0800132 synchronized (StoragePartitionClient.this) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700133 client = newRaftClient(protocol);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800134 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700135 return client.connect(partition.getMemberIds()).whenComplete((r, e) -> {
Madan Jampani2f9cc712016-02-15 19:36:21 -0800136 if (e == null) {
137 log.info("Successfully started client for partition {}", partition.getId());
138 } else {
139 log.info("Failed to start client for partition {}", partition.getId(), e);
140 }
141 }).thenApply(v -> null);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800142 }
143
144 @Override
145 public CompletableFuture<Void> close() {
146 return client != null ? client.close() : CompletableFuture.completedFuture(null);
147 }
148
149 @Override
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700150 @SuppressWarnings("unchecked")
Jordan Halterman2c045992018-03-20 21:33:00 -0700151 public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(ConsistentMapOptions options) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700152 AtomixConsistentMap rawMap =
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700153 new AtomixConsistentMap(client.newProxyBuilder()
Jordan Halterman2c045992018-03-20 21:33:00 -0700154 .withName(options.name())
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700155 .withServiceType(DistributedPrimitive.Type.CONSISTENT_MAP.name())
156 .withReadConsistency(ReadConsistency.SEQUENTIAL)
157 .withCommunicationStrategy(CommunicationStrategy.ANY)
Jordan Halterman153449c2018-04-05 18:31:26 -0700158 .withMinTimeout(MIN_TIMEOUT)
159 .withMaxTimeout(MAX_TIMEOUT)
Jordan Halterman21249352018-01-23 12:35:09 -0800160 .withMaxRetries(MAX_RETRIES)
Jordan Halterman45008172018-03-19 16:40:31 -0700161 .withRevision(options.version() != null && options.revisionType() != null
162 ? options.version().toInt() : 1)
163 .withPropagationStrategy(options.revisionType() != null
164 ? PropagationStrategy.valueOf(options.revisionType().name())
165 : PropagationStrategy.NONE)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700166 .build()
167 .open()
168 .join());
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700169
Jordan Halterman2c045992018-03-20 21:33:00 -0700170 if (options.serializer() != null) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700171 return DistributedPrimitives.newTranscodingMap(rawMap,
Jordan Halterman2c045992018-03-20 21:33:00 -0700172 key -> HexString.toHexString(options.serializer().encode(key)),
173 string -> options.serializer().decode(HexString.fromHexString(string)),
174 value -> value == null ? null : options.serializer().encode(value),
175 bytes -> options.serializer().decode(bytes));
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700176 }
177 return (AsyncConsistentMap<K, V>) rawMap;
Madan Jampani15b8ef52016-02-02 17:35:05 -0800178 }
179
180 @Override
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700181 @SuppressWarnings("unchecked")
Jordan Halterman2c045992018-03-20 21:33:00 -0700182 public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(ConsistentTreeMapOptions options) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700183 AtomixConsistentTreeMap rawMap =
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700184 new AtomixConsistentTreeMap(client.newProxyBuilder()
Jordan Halterman2c045992018-03-20 21:33:00 -0700185 .withName(options.name())
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700186 .withServiceType(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name())
187 .withReadConsistency(ReadConsistency.SEQUENTIAL)
188 .withCommunicationStrategy(CommunicationStrategy.ANY)
Jordan Halterman153449c2018-04-05 18:31:26 -0700189 .withMinTimeout(MIN_TIMEOUT)
190 .withMaxTimeout(MAX_TIMEOUT)
Jordan Halterman21249352018-01-23 12:35:09 -0800191 .withMaxRetries(MAX_RETRIES)
Jordan Halterman45008172018-03-19 16:40:31 -0700192 .withRevision(options.version() != null && options.revisionType() != null
193 ? options.version().toInt() : 1)
194 .withPropagationStrategy(options.revisionType() != null
195 ? PropagationStrategy.valueOf(options.revisionType().name())
196 : PropagationStrategy.NONE)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700197 .build()
198 .open()
199 .join());
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700200
Jordan Halterman2c045992018-03-20 21:33:00 -0700201 if (options.serializer() != null) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700202 return DistributedPrimitives.newTranscodingTreeMap(
203 rawMap,
Jordan Halterman2c045992018-03-20 21:33:00 -0700204 value -> value == null ? null : options.serializer().encode(value),
205 bytes -> options.serializer().decode(bytes));
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700206 }
207 return (AsyncConsistentTreeMap<V>) rawMap;
Aaron Kruglikoved88ff62016-08-01 16:02:09 -0700208 }
209
210 @Override
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700211 @SuppressWarnings("unchecked")
Jordan Halterman2c045992018-03-20 21:33:00 -0700212 public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(ConsistentMultimapOptions options) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700213 AtomixConsistentSetMultimap rawMap =
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700214 new AtomixConsistentSetMultimap(client.newProxyBuilder()
Jordan Halterman2c045992018-03-20 21:33:00 -0700215 .withName(options.name())
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700216 .withServiceType(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name())
217 .withReadConsistency(ReadConsistency.SEQUENTIAL)
218 .withCommunicationStrategy(CommunicationStrategy.ANY)
Jordan Halterman153449c2018-04-05 18:31:26 -0700219 .withMinTimeout(MIN_TIMEOUT)
220 .withMaxTimeout(MAX_TIMEOUT)
Jordan Halterman21249352018-01-23 12:35:09 -0800221 .withMaxRetries(MAX_RETRIES)
Jordan Halterman45008172018-03-19 16:40:31 -0700222 .withRevision(options.version() != null && options.revisionType() != null
223 ? options.version().toInt() : 1)
224 .withPropagationStrategy(options.revisionType() != null
225 ? PropagationStrategy.valueOf(options.revisionType().name())
226 : PropagationStrategy.NONE)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700227 .build()
228 .open()
229 .join());
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700230
Jordan Halterman2c045992018-03-20 21:33:00 -0700231 if (options.serializer() != null) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700232 return DistributedPrimitives.newTranscodingMultimap(
233 rawMap,
Jordan Halterman2c045992018-03-20 21:33:00 -0700234 key -> HexString.toHexString(options.serializer().encode(key)),
235 string -> options.serializer().decode(HexString.fromHexString(string)),
236 value -> options.serializer().encode(value),
237 bytes -> options.serializer().decode(bytes));
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700238 }
239 return (AsyncConsistentMultimap<K, V>) rawMap;
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700240 }
241
242 @Override
Jordan Halterman2c045992018-03-20 21:33:00 -0700243 public <E> AsyncDistributedSet<E> newAsyncDistributedSet(DistributedSetOptions options) {
244 return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(options.name(), options.serializer()));
Madan Jampani15b8ef52016-02-02 17:35:05 -0800245 }
246
247 @Override
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700248 @SuppressWarnings("unchecked")
Jordan Halterman2c045992018-03-20 21:33:00 -0700249 public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(AtomicCounterMapOptions options) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700250 AtomixAtomicCounterMap rawMap = new AtomixAtomicCounterMap(client.newProxyBuilder()
Jordan Halterman2c045992018-03-20 21:33:00 -0700251 .withName(options.name())
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700252 .withServiceType(DistributedPrimitive.Type.COUNTER_MAP.name())
253 .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
254 .withCommunicationStrategy(CommunicationStrategy.LEADER)
Jordan Halterman153449c2018-04-05 18:31:26 -0700255 .withMinTimeout(MIN_TIMEOUT)
256 .withMaxTimeout(MAX_TIMEOUT)
Jordan Halterman21249352018-01-23 12:35:09 -0800257 .withMaxRetries(MAX_RETRIES)
Jordan Halterman45008172018-03-19 16:40:31 -0700258 .withRevision(options.version() != null && options.revisionType() != null
259 ? options.version().toInt() : 1)
260 .withPropagationStrategy(options.revisionType() != null
261 ? PropagationStrategy.valueOf(options.revisionType().name())
262 : PropagationStrategy.NONE)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700263 .build()
264 .open()
265 .join());
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700266
Jordan Halterman2c045992018-03-20 21:33:00 -0700267 if (options.serializer() != null) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700268 return DistributedPrimitives.newTranscodingAtomicCounterMap(
269 rawMap,
Jordan Halterman2c045992018-03-20 21:33:00 -0700270 key -> HexString.toHexString(options.serializer().encode(key)),
271 string -> options.serializer().decode(HexString.fromHexString(string)));
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700272 }
273 return (AsyncAtomicCounterMap<K>) rawMap;
Jordan Haltermanc955df72017-02-04 20:43:28 -0800274 }
275
276 @Override
Jordan Halterman2c045992018-03-20 21:33:00 -0700277 public AsyncAtomicCounter newAsyncCounter(AtomicCounterOptions options) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700278 return new AtomixCounter(client.newProxyBuilder()
Jordan Halterman2c045992018-03-20 21:33:00 -0700279 .withName(options.name())
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700280 .withServiceType(DistributedPrimitive.Type.COUNTER.name())
281 .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
282 .withCommunicationStrategy(CommunicationStrategy.LEADER)
Jordan Halterman153449c2018-04-05 18:31:26 -0700283 .withMinTimeout(MIN_TIMEOUT)
284 .withMaxTimeout(MAX_TIMEOUT)
Jordan Halterman21249352018-01-23 12:35:09 -0800285 .withMaxRetries(MAX_RETRIES)
Jordan Halterman45008172018-03-19 16:40:31 -0700286 .withRevision(options.version() != null && options.revisionType() != null
287 ? options.version().toInt() : 1)
288 .withPropagationStrategy(options.revisionType() != null
289 ? PropagationStrategy.valueOf(options.revisionType().name())
290 : PropagationStrategy.NONE)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700291 .build()
292 .open()
293 .join());
Madan Jampani15b8ef52016-02-02 17:35:05 -0800294 }
295
296 @Override
Jordan Halterman2c045992018-03-20 21:33:00 -0700297 public AsyncAtomicIdGenerator newAsyncIdGenerator(AtomicIdGeneratorOptions options) {
298 return new AtomixIdGenerator(newAsyncCounter(options.name()));
Jordan Halterman5a1053e2017-05-19 18:03:47 -0700299 }
300
301 @Override
Jordan Halterman2c045992018-03-20 21:33:00 -0700302 public <V> AsyncAtomicValue<V> newAsyncAtomicValue(AtomicValueOptions options) {
303 return new DefaultAsyncAtomicValue<>(options.name(), options.serializer(), onosAtomicValuesMap.get());
Madan Jampani15b8ef52016-02-02 17:35:05 -0800304 }
305
306 @Override
Jordan Halterman2c045992018-03-20 21:33:00 -0700307 public <E> WorkQueue<E> newWorkQueue(WorkQueueOptions options) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700308 AtomixWorkQueue atomixWorkQueue = new AtomixWorkQueue(client.newProxyBuilder()
Jordan Halterman2c045992018-03-20 21:33:00 -0700309 .withName(options.name())
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700310 .withServiceType(DistributedPrimitive.Type.WORK_QUEUE.name())
311 .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
312 .withCommunicationStrategy(CommunicationStrategy.LEADER)
Jordan Halterman153449c2018-04-05 18:31:26 -0700313 .withMinTimeout(MIN_TIMEOUT)
314 .withMaxTimeout(MAX_TIMEOUT)
Jordan Halterman21249352018-01-23 12:35:09 -0800315 .withMaxRetries(MAX_RETRIES)
Jordan Halterman45008172018-03-19 16:40:31 -0700316 .withRevision(options.version() != null && options.revisionType() != null
317 ? options.version().toInt() : 1)
318 .withPropagationStrategy(options.revisionType() != null
319 ? PropagationStrategy.valueOf(options.revisionType().name())
320 : PropagationStrategy.NONE)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700321 .build()
322 .open()
323 .join());
Jordan Halterman2c045992018-03-20 21:33:00 -0700324 return new DefaultDistributedWorkQueue<>(atomixWorkQueue, options.serializer());
Madan Jampani35708a92016-07-06 10:48:19 -0700325 }
326
327 @Override
Jordan Halterman2c045992018-03-20 21:33:00 -0700328 public <V> AsyncDocumentTree<V> newAsyncDocumentTree(DocumentTreeOptions options) {
329 String serviceType = String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), options.ordering());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700330 AtomixDocumentTree atomixDocumentTree = new AtomixDocumentTree(client.newProxyBuilder()
Jordan Halterman2c045992018-03-20 21:33:00 -0700331 .withName(options.name())
332 .withServiceType(serviceType)
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700333 .withReadConsistency(ReadConsistency.SEQUENTIAL)
334 .withCommunicationStrategy(CommunicationStrategy.ANY)
Jordan Halterman153449c2018-04-05 18:31:26 -0700335 .withMinTimeout(MIN_TIMEOUT)
336 .withMaxTimeout(MAX_TIMEOUT)
Jordan Halterman21249352018-01-23 12:35:09 -0800337 .withMaxRetries(MAX_RETRIES)
Jordan Halterman45008172018-03-19 16:40:31 -0700338 .withRevision(options.version() != null && options.revisionType() != null
339 ? options.version().toInt() : 1)
340 .withPropagationStrategy(options.revisionType() != null
341 ? PropagationStrategy.valueOf(options.revisionType().name())
342 : PropagationStrategy.NONE)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700343 .build()
344 .open()
345 .join());
Jordan Halterman2c045992018-03-20 21:33:00 -0700346 return new DefaultDistributedDocumentTree<>(options.name(), atomixDocumentTree, options.serializer());
Madan Jampani2914e4e2016-09-13 17:48:56 -0700347 }
348
349 @Override
Jordan Halterman2c045992018-03-20 21:33:00 -0700350 public AsyncDistributedLock newAsyncDistributedLock(DistributedLockOptions options) {
Jordan Halterman47432582018-01-25 16:56:45 -0800351 return new AtomixDistributedLock(client.newProxyBuilder()
Jordan Halterman2c045992018-03-20 21:33:00 -0700352 .withName(options.name())
Jordan Halterman47432582018-01-25 16:56:45 -0800353 .withServiceType(DistributedPrimitive.Type.LOCK.name())
354 .withReadConsistency(ReadConsistency.LINEARIZABLE)
355 .withCommunicationStrategy(CommunicationStrategy.LEADER)
Jordan Halterman153449c2018-04-05 18:31:26 -0700356 .withMinTimeout(MIN_TIMEOUT)
357 .withMaxTimeout(MIN_TIMEOUT)
Jordan Halterman47432582018-01-25 16:56:45 -0800358 .withMaxRetries(MAX_RETRIES)
Jordan Halterman45008172018-03-19 16:40:31 -0700359 .withRevision(options.version() != null && options.revisionType() != null
360 ? options.version().toInt() : 1)
361 .withPropagationStrategy(options.revisionType() != null
362 ? PropagationStrategy.valueOf(options.revisionType().name())
363 : PropagationStrategy.NONE)
Jordan Halterman47432582018-01-25 16:56:45 -0800364 .build()
365 .open()
366 .join());
367 }
368
369 @Override
Jordan Halterman2c045992018-03-20 21:33:00 -0700370 public AsyncLeaderElector newAsyncLeaderElector(LeaderElectorOptions options) {
Jordan Halterman46c5eaa2018-01-24 16:46:55 -0800371 return new AtomixLeaderElector(client.newProxyBuilder()
Jordan Halterman2c045992018-03-20 21:33:00 -0700372 .withName(options.name())
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700373 .withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
374 .withReadConsistency(ReadConsistency.LINEARIZABLE)
375 .withCommunicationStrategy(CommunicationStrategy.LEADER)
Jordan Halterman2c045992018-03-20 21:33:00 -0700376 .withMinTimeout(Duration.ofMillis(options.electionTimeoutMillis()))
Jordan Halterman153449c2018-04-05 18:31:26 -0700377 .withMaxTimeout(MIN_TIMEOUT)
Jordan Halterman21249352018-01-23 12:35:09 -0800378 .withMaxRetries(MAX_RETRIES)
Jordan Halterman45008172018-03-19 16:40:31 -0700379 .withRevision(options.version() != null && options.revisionType() != null
380 ? options.version().toInt() : 1)
381 .withPropagationStrategy(options.revisionType() != null
382 ? PropagationStrategy.valueOf(options.revisionType().name())
383 : PropagationStrategy.NONE)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700384 .build()
385 .open()
386 .join());
Madan Jampani15b8ef52016-02-02 17:35:05 -0800387 }
388
389 @Override
Madan Jampanie14a09c2016-02-11 10:43:21 -0800390 public Set<String> getAsyncConsistentMapNames() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700391 return client.metadata().getSessions(DistributedPrimitive.Type.CONSISTENT_MAP.name())
392 .join()
393 .stream()
394 .map(RaftSessionMetadata::serviceName)
395 .collect(Collectors.toSet());
Madan Jampanie14a09c2016-02-11 10:43:21 -0800396 }
397
398 @Override
399 public Set<String> getAsyncAtomicCounterNames() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700400 return client.metadata().getSessions(DistributedPrimitive.Type.COUNTER.name())
401 .join()
402 .stream()
403 .map(RaftSessionMetadata::serviceName)
404 .collect(Collectors.toSet());
Madan Jampanie14a09c2016-02-11 10:43:21 -0800405 }
406
407 @Override
Madan Jampani35708a92016-07-06 10:48:19 -0700408 public Set<String> getWorkQueueNames() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700409 return client.metadata().getSessions(DistributedPrimitive.Type.WORK_QUEUE.name())
410 .join()
411 .stream()
412 .map(RaftSessionMetadata::serviceName)
413 .collect(Collectors.toSet());
Madan Jampani35708a92016-07-06 10:48:19 -0700414 }
415
416 @Override
Madan Jampani15b8ef52016-02-02 17:35:05 -0800417 public boolean isOpen() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700418 return client != null;
Madan Jampani15b8ef52016-02-02 17:35:05 -0800419 }
Madan Jampani471a7bc2016-04-27 15:51:38 -0700420
Madan Jampaniccdf9da2016-05-05 14:37:27 -0700421 /**
422 * Returns the {@link PartitionClientInfo information} for this client.
423 * @return partition client information
424 */
425 public PartitionClientInfo clientInfo() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700426 return new PartitionClientInfo(partition.getId(), partition.getMembers());
Madan Jampaniccdf9da2016-05-05 14:37:27 -0700427 }
428
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700429 private RaftClient newRaftClient(RaftClientProtocol protocol) {
430 return RaftClient.newBuilder()
431 .withClientId("partition-" + partition.getId())
Jordan Halterman980a8c12017-09-22 18:01:19 -0700432 .withMemberId(localMemberId)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700433 .withProtocol(protocol)
Madan Jampani471a7bc2016-04-27 15:51:38 -0700434 .build();
Madan Jampanid5b200f2016-06-06 17:15:25 -0700435 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800436}