blob: 230060aabef353756dd7c285ae2785623955012f [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani15b8ef52016-02-02 17:35:05 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Madan Jampani2f9cc712016-02-15 19:36:21 -080018import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani15b8ef52016-02-02 17:35:05 -080019import io.atomix.Atomix;
20import io.atomix.AtomixClient;
Madan Jampani471a7bc2016-04-27 15:51:38 -070021import io.atomix.catalyst.transport.Address;
Madan Jampani15b8ef52016-02-02 17:35:05 -080022import io.atomix.catalyst.transport.Transport;
Madan Jampani471a7bc2016-04-27 15:51:38 -070023import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
24import io.atomix.copycat.client.ConnectionStrategies;
25import io.atomix.copycat.client.CopycatClient;
26import io.atomix.copycat.client.RecoveryStrategies;
27import io.atomix.copycat.client.RetryStrategies;
28import io.atomix.copycat.client.ServerSelectionStrategies;
29import io.atomix.manager.ResourceClient;
30import io.atomix.manager.state.ResourceManagerException;
31import io.atomix.manager.util.ResourceManagerTypeResolver;
32import io.atomix.resource.ResourceType;
33import io.atomix.resource.util.ResourceRegistry;
Madan Jampani15b8ef52016-02-02 17:35:05 -080034import io.atomix.variables.DistributedLong;
35
Madan Jampani471a7bc2016-04-27 15:51:38 -070036import java.util.Collection;
Madan Jampanie14a09c2016-02-11 10:43:21 -080037import java.util.Set;
Madan Jampani15b8ef52016-02-02 17:35:05 -080038import java.util.concurrent.CompletableFuture;
39
40import org.onlab.util.HexString;
41import org.onosproject.store.primitives.DistributedPrimitiveCreator;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
43import org.onosproject.store.primitives.resources.impl.AtomixCounter;
Madan Jampani39fff102016-02-14 13:17:28 -080044import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
Madan Jampani15b8ef52016-02-02 17:35:05 -080045import org.onosproject.store.serializers.KryoNamespaces;
46import org.onosproject.store.service.AsyncAtomicCounter;
47import org.onosproject.store.service.AsyncAtomicValue;
48import org.onosproject.store.service.AsyncConsistentMap;
49import org.onosproject.store.service.AsyncDistributedSet;
50import org.onosproject.store.service.AsyncLeaderElector;
51import org.onosproject.store.service.DistributedQueue;
52import org.onosproject.store.service.Serializer;
Madan Jampani2f9cc712016-02-15 19:36:21 -080053import org.slf4j.Logger;
Madan Jampani15b8ef52016-02-02 17:35:05 -080054
55import com.google.common.base.Supplier;
56import com.google.common.base.Suppliers;
Madan Jampani15b8ef52016-02-02 17:35:05 -080057
58/**
59 * StoragePartition client.
60 */
61public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed<StoragePartitionClient> {
62
Madan Jampani2f9cc712016-02-15 19:36:21 -080063 private final Logger log = getLogger(getClass());
64
Madan Jampani15b8ef52016-02-02 17:35:05 -080065 private final StoragePartition partition;
66 private final Transport transport;
67 private final io.atomix.catalyst.serializer.Serializer serializer;
Madan Jampani15b8ef52016-02-02 17:35:05 -080068 private Atomix client;
Madan Jampani471a7bc2016-04-27 15:51:38 -070069 private CopycatClient copycatClient;
Madan Jampani15b8ef52016-02-02 17:35:05 -080070 private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
71 private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
72 Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
73 Serializer.using(KryoNamespaces.BASIC)));
74
75 public StoragePartitionClient(StoragePartition partition,
76 io.atomix.catalyst.serializer.Serializer serializer,
Madan Jampani65f24bb2016-03-15 15:16:18 -070077 Transport transport) {
Madan Jampani15b8ef52016-02-02 17:35:05 -080078 this.partition = partition;
79 this.serializer = serializer;
80 this.transport = transport;
Madan Jampani15b8ef52016-02-02 17:35:05 -080081 }
82
83 @Override
84 public CompletableFuture<Void> open() {
85 if (client != null && client.isOpen()) {
86 return CompletableFuture.completedFuture(null);
87 }
88 synchronized (StoragePartitionClient.this) {
Madan Jampani471a7bc2016-04-27 15:51:38 -070089 copycatClient = newCopycatClient(partition.getMemberAddresses(),
90 transport,
91 serializer.clone(),
92 StoragePartition.RESOURCE_TYPES);
93 copycatClient.onStateChange(state -> log.info("Client state {}", state));
94 client = new AtomixClient(new ResourceClient(copycatClient));
Madan Jampani15b8ef52016-02-02 17:35:05 -080095 }
Madan Jampani2f9cc712016-02-15 19:36:21 -080096 return client.open().whenComplete((r, e) -> {
97 if (e == null) {
98 log.info("Successfully started client for partition {}", partition.getId());
99 } else {
100 log.info("Failed to start client for partition {}", partition.getId(), e);
101 }
102 }).thenApply(v -> null);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800103 }
104
105 @Override
106 public CompletableFuture<Void> close() {
107 return client != null ? client.close() : CompletableFuture.completedFuture(null);
108 }
109
110 @Override
111 public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
112 AsyncConsistentMap<String, byte[]> rawMap =
Madan Jampani65f24bb2016-03-15 15:16:18 -0700113 new DelegatingAsyncConsistentMap<String, byte[]>(client.getResource(name, AtomixConsistentMap.class)
114 .join()) {
Madan Jampani15b8ef52016-02-02 17:35:05 -0800115 @Override
116 public String name() {
117 return name;
118 }
119 };
120 AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.<K, V, String, byte[]>newTranscodingMap(rawMap,
121 key -> HexString.toHexString(serializer.encode(key)),
122 string -> serializer.decode(HexString.fromHexString(string)),
123 value -> value == null ? null : serializer.encode(value),
124 bytes -> serializer.decode(bytes));
125
Madan Jampani542d9e22016-04-05 15:39:55 -0700126 return transcodedMap;
Madan Jampani15b8ef52016-02-02 17:35:05 -0800127 }
128
129 @Override
130 public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
131 return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer));
132 }
133
134 @Override
135 public AsyncAtomicCounter newAsyncCounter(String name) {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700136 DistributedLong distributedLong = client.getLong(name).join();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800137 return new AtomixCounter(name, distributedLong);
138 }
139
140 @Override
141 public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
142 return new DefaultAsyncAtomicValue<>(name,
143 serializer,
144 onosAtomicValuesMap.get());
145 }
146
147 @Override
148 public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
149 // TODO: Implement
150 throw new UnsupportedOperationException();
151 }
152
153 @Override
154 public AsyncLeaderElector newAsyncLeaderElector(String name) {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700155 return client.getResource(name, AtomixLeaderElector.class).join();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800156 }
157
158 @Override
Madan Jampanie14a09c2016-02-11 10:43:21 -0800159 public Set<String> getAsyncConsistentMapNames() {
160 return client.keys(AtomixConsistentMap.class).join();
161 }
162
163 @Override
164 public Set<String> getAsyncAtomicCounterNames() {
165 return client.keys(DistributedLong.class).join();
166 }
167
168 @Override
Madan Jampani15b8ef52016-02-02 17:35:05 -0800169 public boolean isOpen() {
170 return client.isOpen();
171 }
Madan Jampani471a7bc2016-04-27 15:51:38 -0700172
173 private CopycatClient newCopycatClient(Collection<Address> members,
174 Transport transport,
175 io.atomix.catalyst.serializer.Serializer serializer,
176 Collection<ResourceType> resourceTypes) {
177 ResourceRegistry registry = new ResourceRegistry();
178 resourceTypes.forEach(registry::register);
179 CopycatClient client = CopycatClient.builder(members)
180 .withServerSelectionStrategy(ServerSelectionStrategies.ANY)
181 .withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
182 .withRecoveryStrategy(RecoveryStrategies.RECOVER)
183 .withRetryStrategy(RetryStrategies.FIBONACCI_BACKOFF)
184 .withTransport(transport)
185 .withSerializer(serializer)
186 .withThreadFactory(new CatalystThreadFactory(String.format("copycat-client-%s", partition.getId())))
187 .build();
188 client.serializer().resolve(new ResourceManagerTypeResolver());
189 for (ResourceType type : registry.types()) {
190 try {
191 type.factory().newInstance().createSerializableTypeResolver().resolve(client.serializer().registry());
192 } catch (InstantiationException | IllegalAccessException e) {
193 throw new ResourceManagerException(e);
194 }
195 }
196 return client;
197 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800198}