blob: 6679ea2ec71ad0175a35d4b485d25650a15e1134 [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani15b8ef52016-02-02 17:35:05 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Jordan Halterman2bf177c2017-06-29 01:49:08 -070018import java.time.Duration;
Jordan Halterman046faeb2017-05-01 15:10:13 -070019import java.util.Set;
20import java.util.concurrent.CompletableFuture;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070021import java.util.stream.Collectors;
Jordan Halterman046faeb2017-05-01 15:10:13 -070022
Jordan Haltermanc955df72017-02-04 20:43:28 -080023import com.google.common.base.Suppliers;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070024import io.atomix.protocols.raft.RaftClient;
25import io.atomix.protocols.raft.ReadConsistency;
26import io.atomix.protocols.raft.cluster.MemberId;
27import io.atomix.protocols.raft.protocol.RaftClientProtocol;
28import io.atomix.protocols.raft.proxy.CommunicationStrategy;
29import io.atomix.protocols.raft.session.RaftSessionMetadata;
Madan Jampani15b8ef52016-02-02 17:35:05 -080030import org.onlab.util.HexString;
31import org.onosproject.store.primitives.DistributedPrimitiveCreator;
Jordan Haltermanc955df72017-02-04 20:43:28 -080032import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMap;
Madan Jampani15b8ef52016-02-02 17:35:05 -080033import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
Aaron Kruglikov61582a02016-09-06 13:18:58 -070034import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimap;
Aaron Kruglikoved88ff62016-08-01 16:02:09 -070035import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMap;
Madan Jampani15b8ef52016-02-02 17:35:05 -080036import org.onosproject.store.primitives.resources.impl.AtomixCounter;
Madan Jampani2914e4e2016-09-13 17:48:56 -070037import org.onosproject.store.primitives.resources.impl.AtomixDocumentTree;
Jordan Halterman5a1053e2017-05-19 18:03:47 -070038import org.onosproject.store.primitives.resources.impl.AtomixIdGenerator;
Madan Jampani39fff102016-02-14 13:17:28 -080039import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
Madan Jampani35708a92016-07-06 10:48:19 -070040import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
Madan Jampani15b8ef52016-02-02 17:35:05 -080041import org.onosproject.store.serializers.KryoNamespaces;
42import org.onosproject.store.service.AsyncAtomicCounter;
Jordan Haltermanc955df72017-02-04 20:43:28 -080043import org.onosproject.store.service.AsyncAtomicCounterMap;
Jordan Halterman5a1053e2017-05-19 18:03:47 -070044import org.onosproject.store.service.AsyncAtomicIdGenerator;
Madan Jampani15b8ef52016-02-02 17:35:05 -080045import org.onosproject.store.service.AsyncAtomicValue;
46import org.onosproject.store.service.AsyncConsistentMap;
Aaron Kruglikov61582a02016-09-06 13:18:58 -070047import org.onosproject.store.service.AsyncConsistentMultimap;
Aaron Kruglikoved88ff62016-08-01 16:02:09 -070048import org.onosproject.store.service.AsyncConsistentTreeMap;
Madan Jampani15b8ef52016-02-02 17:35:05 -080049import org.onosproject.store.service.AsyncDistributedSet;
Madan Jampani2914e4e2016-09-13 17:48:56 -070050import org.onosproject.store.service.AsyncDocumentTree;
Madan Jampani15b8ef52016-02-02 17:35:05 -080051import org.onosproject.store.service.AsyncLeaderElector;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070052import org.onosproject.store.service.DistributedPrimitive;
Madan Jampaniccdf9da2016-05-05 14:37:27 -070053import org.onosproject.store.service.PartitionClientInfo;
Madan Jampani15b8ef52016-02-02 17:35:05 -080054import org.onosproject.store.service.Serializer;
Madan Jampani819d61d2016-07-25 20:29:43 -070055import org.onosproject.store.service.WorkQueue;
Madan Jampani2f9cc712016-02-15 19:36:21 -080056import org.slf4j.Logger;
Madan Jampani15b8ef52016-02-02 17:35:05 -080057
Jordan Haltermanc955df72017-02-04 20:43:28 -080058import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani15b8ef52016-02-02 17:35:05 -080059
60/**
61 * StoragePartition client.
62 */
63public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed<StoragePartitionClient> {
64
Madan Jampani2f9cc712016-02-15 19:36:21 -080065 private final Logger log = getLogger(getClass());
66
Madan Jampani15b8ef52016-02-02 17:35:05 -080067 private final StoragePartition partition;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070068 private final MemberId localMemberId;
69 private final RaftClientProtocol protocol;
70 private RaftClient client;
Madan Jampani15b8ef52016-02-02 17:35:05 -080071 private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
Jordan Halterman9bdc24f2017-04-19 23:45:12 -070072 private final com.google.common.base.Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
Madan Jampani15b8ef52016-02-02 17:35:05 -080073 Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
74 Serializer.using(KryoNamespaces.BASIC)));
75
Jordan Halterman2bf177c2017-06-29 01:49:08 -070076 public StoragePartitionClient(StoragePartition partition, MemberId localMemberId, RaftClientProtocol protocol) {
Madan Jampani15b8ef52016-02-02 17:35:05 -080077 this.partition = partition;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070078 this.localMemberId = localMemberId;
79 this.protocol = protocol;
Madan Jampani15b8ef52016-02-02 17:35:05 -080080 }
81
82 @Override
83 public CompletableFuture<Void> open() {
Madan Jampani15b8ef52016-02-02 17:35:05 -080084 synchronized (StoragePartitionClient.this) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -070085 client = newRaftClient(protocol);
Madan Jampani15b8ef52016-02-02 17:35:05 -080086 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -070087 return client.connect(partition.getMemberIds()).whenComplete((r, e) -> {
Madan Jampani2f9cc712016-02-15 19:36:21 -080088 if (e == null) {
89 log.info("Successfully started client for partition {}", partition.getId());
90 } else {
91 log.info("Failed to start client for partition {}", partition.getId(), e);
92 }
93 }).thenApply(v -> null);
Jordan Halterman2bf177c2017-06-29 01:49:08 -070094
Madan Jampani15b8ef52016-02-02 17:35:05 -080095 }
96
97 @Override
98 public CompletableFuture<Void> close() {
99 return client != null ? client.close() : CompletableFuture.completedFuture(null);
100 }
101
102 @Override
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700103 @SuppressWarnings("unchecked")
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700104 public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700105 AtomixConsistentMap rawMap =
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700106 new AtomixConsistentMap(client.newProxyBuilder()
107 .withName(name)
108 .withServiceType(DistributedPrimitive.Type.CONSISTENT_MAP.name())
109 .withReadConsistency(ReadConsistency.SEQUENTIAL)
110 .withCommunicationStrategy(CommunicationStrategy.ANY)
111 .withTimeout(Duration.ofSeconds(30))
112 .withMaxRetries(5)
113 .build()
114 .open()
115 .join());
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700116
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700117 if (serializer != null) {
118 return DistributedPrimitives.newTranscodingMap(rawMap,
119 key -> HexString.toHexString(serializer.encode(key)),
120 string -> serializer.decode(HexString.fromHexString(string)),
121 value -> value == null ? null : serializer.encode(value),
122 bytes -> serializer.decode(bytes));
123 }
124 return (AsyncConsistentMap<K, V>) rawMap;
Madan Jampani15b8ef52016-02-02 17:35:05 -0800125 }
126
127 @Override
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700128 @SuppressWarnings("unchecked")
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700129 public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700130 AtomixConsistentTreeMap rawMap =
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700131 new AtomixConsistentTreeMap(client.newProxyBuilder()
132 .withName(name)
133 .withServiceType(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name())
134 .withReadConsistency(ReadConsistency.SEQUENTIAL)
135 .withCommunicationStrategy(CommunicationStrategy.ANY)
136 .withTimeout(Duration.ofSeconds(30))
137 .withMaxRetries(5)
138 .build()
139 .open()
140 .join());
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700141
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700142 if (serializer != null) {
143 return DistributedPrimitives.newTranscodingTreeMap(
144 rawMap,
145 value -> value == null ? null : serializer.encode(value),
146 bytes -> serializer.decode(bytes));
147 }
148 return (AsyncConsistentTreeMap<V>) rawMap;
Aaron Kruglikoved88ff62016-08-01 16:02:09 -0700149 }
150
151 @Override
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700152 @SuppressWarnings("unchecked")
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700153 public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(String name, Serializer serializer) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700154 AtomixConsistentSetMultimap rawMap =
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700155 new AtomixConsistentSetMultimap(client.newProxyBuilder()
156 .withName(name)
157 .withServiceType(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name())
158 .withReadConsistency(ReadConsistency.SEQUENTIAL)
159 .withCommunicationStrategy(CommunicationStrategy.ANY)
160 .withTimeout(Duration.ofSeconds(30))
161 .withMaxRetries(5)
162 .build()
163 .open()
164 .join());
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700165
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700166 if (serializer != null) {
167 return DistributedPrimitives.newTranscodingMultimap(
168 rawMap,
169 key -> HexString.toHexString(serializer.encode(key)),
170 string -> serializer.decode(HexString.fromHexString(string)),
171 value -> serializer.encode(value),
172 bytes -> serializer.decode(bytes));
173 }
174 return (AsyncConsistentMultimap<K, V>) rawMap;
Aaron Kruglikov61582a02016-09-06 13:18:58 -0700175 }
176
177 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700178 public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
179 return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer));
Madan Jampani15b8ef52016-02-02 17:35:05 -0800180 }
181
182 @Override
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700183 @SuppressWarnings("unchecked")
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700184 public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700185 AtomixAtomicCounterMap rawMap = new AtomixAtomicCounterMap(client.newProxyBuilder()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700186 .withName(name)
187 .withServiceType(DistributedPrimitive.Type.COUNTER_MAP.name())
188 .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
189 .withCommunicationStrategy(CommunicationStrategy.LEADER)
190 .withTimeout(Duration.ofSeconds(30))
191 .withMaxRetries(5)
192 .build()
193 .open()
194 .join());
Jordan Halterman9bdc24f2017-04-19 23:45:12 -0700195
Jordan Haltermane7f363e2017-07-28 16:52:20 -0700196 if (serializer != null) {
197 return DistributedPrimitives.newTranscodingAtomicCounterMap(
198 rawMap,
199 key -> HexString.toHexString(serializer.encode(key)),
200 string -> serializer.decode(HexString.fromHexString(string)));
201 }
202 return (AsyncAtomicCounterMap<K>) rawMap;
Jordan Haltermanc955df72017-02-04 20:43:28 -0800203 }
204
205 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700206 public AsyncAtomicCounter newAsyncCounter(String name) {
207 return new AtomixCounter(client.newProxyBuilder()
208 .withName(name)
209 .withServiceType(DistributedPrimitive.Type.COUNTER.name())
210 .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
211 .withCommunicationStrategy(CommunicationStrategy.LEADER)
212 .withTimeout(Duration.ofSeconds(30))
213 .withMaxRetries(5)
214 .build()
215 .open()
216 .join());
Madan Jampani15b8ef52016-02-02 17:35:05 -0800217 }
218
219 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700220 public AsyncAtomicIdGenerator newAsyncIdGenerator(String name) {
221 return new AtomixIdGenerator(newAsyncCounter(name));
Jordan Halterman5a1053e2017-05-19 18:03:47 -0700222 }
223
224 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700225 public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
226 return new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
Madan Jampani15b8ef52016-02-02 17:35:05 -0800227 }
228
229 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700230 public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
231 AtomixWorkQueue atomixWorkQueue = new AtomixWorkQueue(client.newProxyBuilder()
232 .withName(name)
233 .withServiceType(DistributedPrimitive.Type.WORK_QUEUE.name())
234 .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
235 .withCommunicationStrategy(CommunicationStrategy.LEADER)
236 .withTimeout(Duration.ofSeconds(5))
237 .withMaxRetries(5)
238 .build()
239 .open()
240 .join());
241 return new DefaultDistributedWorkQueue<>(atomixWorkQueue, serializer);
Madan Jampani35708a92016-07-06 10:48:19 -0700242 }
243
244 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700245 public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
246 AtomixDocumentTree atomixDocumentTree = new AtomixDocumentTree(client.newProxyBuilder()
247 .withName(name)
248 .withServiceType(DistributedPrimitive.Type.DOCUMENT_TREE.name())
249 .withReadConsistency(ReadConsistency.SEQUENTIAL)
250 .withCommunicationStrategy(CommunicationStrategy.ANY)
251 .withTimeout(Duration.ofSeconds(30))
252 .withMaxRetries(5)
253 .build()
254 .open()
255 .join());
256 return new DefaultDistributedDocumentTree<>(name, atomixDocumentTree, serializer);
Madan Jampani2914e4e2016-09-13 17:48:56 -0700257 }
258
259 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700260 public AsyncLeaderElector newAsyncLeaderElector(String name) {
261 AtomixLeaderElector leaderElector = new AtomixLeaderElector(client.newProxyBuilder()
262 .withName(name)
263 .withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
264 .withReadConsistency(ReadConsistency.LINEARIZABLE)
265 .withCommunicationStrategy(CommunicationStrategy.LEADER)
266 .withTimeout(Duration.ofSeconds(5))
267 .withMaxRetries(5)
268 .build()
269 .open()
270 .join());
271 leaderElector.setupCache().join();
272 return leaderElector;
Madan Jampani15b8ef52016-02-02 17:35:05 -0800273 }
274
275 @Override
Madan Jampanie14a09c2016-02-11 10:43:21 -0800276 public Set<String> getAsyncConsistentMapNames() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700277 return client.metadata().getSessions(DistributedPrimitive.Type.CONSISTENT_MAP.name())
278 .join()
279 .stream()
280 .map(RaftSessionMetadata::serviceName)
281 .collect(Collectors.toSet());
Madan Jampanie14a09c2016-02-11 10:43:21 -0800282 }
283
284 @Override
285 public Set<String> getAsyncAtomicCounterNames() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700286 return client.metadata().getSessions(DistributedPrimitive.Type.COUNTER.name())
287 .join()
288 .stream()
289 .map(RaftSessionMetadata::serviceName)
290 .collect(Collectors.toSet());
Madan Jampanie14a09c2016-02-11 10:43:21 -0800291 }
292
293 @Override
Madan Jampani35708a92016-07-06 10:48:19 -0700294 public Set<String> getWorkQueueNames() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700295 return client.metadata().getSessions(DistributedPrimitive.Type.WORK_QUEUE.name())
296 .join()
297 .stream()
298 .map(RaftSessionMetadata::serviceName)
299 .collect(Collectors.toSet());
Madan Jampani35708a92016-07-06 10:48:19 -0700300 }
301
302 @Override
Madan Jampani15b8ef52016-02-02 17:35:05 -0800303 public boolean isOpen() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700304 return client != null;
Madan Jampani15b8ef52016-02-02 17:35:05 -0800305 }
Madan Jampani471a7bc2016-04-27 15:51:38 -0700306
Madan Jampaniccdf9da2016-05-05 14:37:27 -0700307 /**
308 * Returns the {@link PartitionClientInfo information} for this client.
309 * @return partition client information
310 */
311 public PartitionClientInfo clientInfo() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700312 return new PartitionClientInfo(partition.getId(), partition.getMembers());
Madan Jampaniccdf9da2016-05-05 14:37:27 -0700313 }
314
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700315 private RaftClient newRaftClient(RaftClientProtocol protocol) {
316 return RaftClient.newBuilder()
317 .withClientId("partition-" + partition.getId())
318 .withMemberId(MemberId.from(localMemberId.id()))
319 .withProtocol(protocol)
Madan Jampani471a7bc2016-04-27 15:51:38 -0700320 .build();
Madan Jampanid5b200f2016-06-06 17:15:25 -0700321 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800322}