blob: c70f4e474f7d682e07fd74c286596e1a07dca78b [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Madan Jampani2f9cc712016-02-15 19:36:21 -080018import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani15b8ef52016-02-02 17:35:05 -080019import io.atomix.Atomix;
20import io.atomix.AtomixClient;
21import io.atomix.catalyst.transport.Transport;
Madan Jampani15b8ef52016-02-02 17:35:05 -080022import io.atomix.variables.DistributedLong;
23
Madan Jampanie14a09c2016-02-11 10:43:21 -080024import java.util.Set;
Madan Jampani15b8ef52016-02-02 17:35:05 -080025import java.util.concurrent.CompletableFuture;
26
27import org.onlab.util.HexString;
28import org.onosproject.store.primitives.DistributedPrimitiveCreator;
29import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
30import org.onosproject.store.primitives.resources.impl.AtomixCounter;
Madan Jampani39fff102016-02-14 13:17:28 -080031import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
Madan Jampani15b8ef52016-02-02 17:35:05 -080032import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.AsyncAtomicCounter;
34import org.onosproject.store.service.AsyncAtomicValue;
35import org.onosproject.store.service.AsyncConsistentMap;
36import org.onosproject.store.service.AsyncDistributedSet;
37import org.onosproject.store.service.AsyncLeaderElector;
38import org.onosproject.store.service.DistributedQueue;
39import org.onosproject.store.service.Serializer;
Madan Jampani2f9cc712016-02-15 19:36:21 -080040import org.slf4j.Logger;
Madan Jampani15b8ef52016-02-02 17:35:05 -080041
42import com.google.common.base.Supplier;
43import com.google.common.base.Suppliers;
Madan Jampani15b8ef52016-02-02 17:35:05 -080044
45/**
46 * StoragePartition client.
47 */
48public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed<StoragePartitionClient> {
49
Madan Jampani2f9cc712016-02-15 19:36:21 -080050 private final Logger log = getLogger(getClass());
51
Madan Jampani15b8ef52016-02-02 17:35:05 -080052 private final StoragePartition partition;
53 private final Transport transport;
54 private final io.atomix.catalyst.serializer.Serializer serializer;
Madan Jampani15b8ef52016-02-02 17:35:05 -080055 private Atomix client;
56 private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
57 private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
58 Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
59 Serializer.using(KryoNamespaces.BASIC)));
60
61 public StoragePartitionClient(StoragePartition partition,
62 io.atomix.catalyst.serializer.Serializer serializer,
Madan Jampani65f24bb2016-03-15 15:16:18 -070063 Transport transport) {
Madan Jampani15b8ef52016-02-02 17:35:05 -080064 this.partition = partition;
65 this.serializer = serializer;
66 this.transport = transport;
Madan Jampani15b8ef52016-02-02 17:35:05 -080067 }
68
69 @Override
70 public CompletableFuture<Void> open() {
71 if (client != null && client.isOpen()) {
72 return CompletableFuture.completedFuture(null);
73 }
74 synchronized (StoragePartitionClient.this) {
75 client = AtomixClient.builder(partition.getMemberAddresses())
Madan Jampani65f24bb2016-03-15 15:16:18 -070076 .withResourceTypes(StoragePartition.RESOURCE_TYPES)
Madan Jampani15b8ef52016-02-02 17:35:05 -080077 .withSerializer(serializer.clone())
Madan Jampani15b8ef52016-02-02 17:35:05 -080078 .withTransport(transport)
79 .build();
80 }
Madan Jampani2f9cc712016-02-15 19:36:21 -080081 return client.open().whenComplete((r, e) -> {
82 if (e == null) {
83 log.info("Successfully started client for partition {}", partition.getId());
84 } else {
85 log.info("Failed to start client for partition {}", partition.getId(), e);
86 }
87 }).thenApply(v -> null);
Madan Jampani15b8ef52016-02-02 17:35:05 -080088 }
89
90 @Override
91 public CompletableFuture<Void> close() {
92 return client != null ? client.close() : CompletableFuture.completedFuture(null);
93 }
94
95 @Override
96 public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
97 AsyncConsistentMap<String, byte[]> rawMap =
Madan Jampani65f24bb2016-03-15 15:16:18 -070098 new DelegatingAsyncConsistentMap<String, byte[]>(client.getResource(name, AtomixConsistentMap.class)
99 .join()) {
Madan Jampani15b8ef52016-02-02 17:35:05 -0800100 @Override
101 public String name() {
102 return name;
103 }
104 };
105 AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.<K, V, String, byte[]>newTranscodingMap(rawMap,
106 key -> HexString.toHexString(serializer.encode(key)),
107 string -> serializer.decode(HexString.fromHexString(string)),
108 value -> value == null ? null : serializer.encode(value),
109 bytes -> serializer.decode(bytes));
110
111 return DistributedPrimitives.newCachingMap(transcodedMap);
112 }
113
114 @Override
115 public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
116 return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer));
117 }
118
119 @Override
120 public AsyncAtomicCounter newAsyncCounter(String name) {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700121 DistributedLong distributedLong = client.getLong(name).join();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800122 return new AtomixCounter(name, distributedLong);
123 }
124
125 @Override
126 public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
127 return new DefaultAsyncAtomicValue<>(name,
128 serializer,
129 onosAtomicValuesMap.get());
130 }
131
132 @Override
133 public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
134 // TODO: Implement
135 throw new UnsupportedOperationException();
136 }
137
138 @Override
139 public AsyncLeaderElector newAsyncLeaderElector(String name) {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700140 return client.getResource(name, AtomixLeaderElector.class).join();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800141 }
142
143 @Override
Madan Jampanie14a09c2016-02-11 10:43:21 -0800144 public Set<String> getAsyncConsistentMapNames() {
145 return client.keys(AtomixConsistentMap.class).join();
146 }
147
148 @Override
149 public Set<String> getAsyncAtomicCounterNames() {
150 return client.keys(DistributedLong.class).join();
151 }
152
153 @Override
Madan Jampani15b8ef52016-02-02 17:35:05 -0800154 public boolean isOpen() {
155 return client.isOpen();
156 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800157}