blob: 645544dbf65e19976160e883e5e2a751348404cf [file] [log] [blame]
Jordan Halterman00e92da2018-05-22 23:05:52 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuskab6d31672018-07-27 17:03:46 -070016package org.onosproject.store.atomix.primitives.impl;
Jordan Halterman00e92da2018-05-22 23:05:52 -070017
Jordan Haltermanbc982392018-08-07 15:02:37 -070018import java.time.Duration;
Jordan Halterman00e92da2018-05-22 23:05:52 -070019import java.util.Collection;
20import java.util.Map;
21import java.util.Set;
22import java.util.concurrent.CompletableFuture;
23import java.util.concurrent.Executor;
24import java.util.function.BiFunction;
25import java.util.function.Predicate;
26
27import com.google.common.base.Throwables;
28import com.google.common.collect.Maps;
29import io.atomix.core.collection.impl.TranscodingAsyncDistributedCollection;
30import io.atomix.core.set.impl.TranscodingAsyncDistributedSet;
31import org.onosproject.store.primitives.MapUpdate;
32import org.onosproject.store.primitives.TransactionId;
33import org.onosproject.store.service.AsyncConsistentMap;
34import org.onosproject.store.service.AsyncIterator;
35import org.onosproject.store.service.ConsistentMapException;
36import org.onosproject.store.service.MapEvent;
37import org.onosproject.store.service.MapEventListener;
38import org.onosproject.store.service.TransactionLog;
39import org.onosproject.store.service.Version;
40import org.onosproject.store.service.Versioned;
41
42/**
43 * Atomix consistent map.
44 */
45public class AtomixConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
46 private final io.atomix.core.map.AsyncAtomicMap<K, V> atomixMap;
47 private final Map<MapEventListener<K, V>, io.atomix.core.map.AtomicMapEventListener<K, V>> listenerMap =
48 Maps.newIdentityHashMap();
49
50 public AtomixConsistentMap(io.atomix.core.map.AsyncAtomicMap<K, V> atomixMap) {
51 this.atomixMap = atomixMap;
52 }
53
54 @Override
55 public String name() {
56 return atomixMap.name();
57 }
58
59 @Override
60 public CompletableFuture<Integer> size() {
61 return atomixMap.size();
62 }
63
64 @Override
65 public CompletableFuture<Boolean> containsKey(K key) {
66 return atomixMap.containsKey(key);
67 }
68
69 @Override
70 public CompletableFuture<Boolean> containsValue(V value) {
71 return atomixMap.containsValue(value);
72 }
73
74 @Override
75 public CompletableFuture<Versioned<V>> get(K key) {
76 return atomixMap.get(key).thenApply(this::toVersioned);
77 }
78
79 @Override
80 public CompletableFuture<Versioned<V>> getOrDefault(K key, V defaultValue) {
81 return atomixMap.getOrDefault(key, defaultValue).thenApply(this::toVersioned);
82 }
83
84 @Override
85 public CompletableFuture<Versioned<V>> computeIf(
86 K key, Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
87 return adapt(atomixMap.computeIf(key, condition, remappingFunction).thenApply(this::toVersioned));
88 }
89
90 @Override
91 public CompletableFuture<Versioned<V>> put(K key, V value) {
92 return adapt(atomixMap.put(key, value).thenApply(this::toVersioned));
93 }
94
95 @Override
96 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
97 return adapt(atomixMap.putAndGet(key, value).thenApply(this::toVersioned));
98 }
99
100 @Override
101 public CompletableFuture<Versioned<V>> remove(K key) {
102 return adapt(atomixMap.remove(key).thenApply(this::toVersioned));
103 }
104
105 @Override
106 public CompletableFuture<Void> clear() {
107 return atomixMap.clear();
108 }
109
110 @Override
111 public CompletableFuture<Set<K>> keySet() {
Jordan Haltermanbc982392018-08-07 15:02:37 -0700112 return CompletableFuture.completedFuture(atomixMap.keySet()
113 .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS)));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700114 }
115
116 @Override
117 public CompletableFuture<Collection<Versioned<V>>> values() {
118 return CompletableFuture.completedFuture(
119 new TranscodingAsyncDistributedCollection<Versioned<V>, io.atomix.utils.time.Versioned<V>>(
120 atomixMap.values(),
121 v -> new io.atomix.utils.time.Versioned<>(v.value(), v.version()),
Jordan Haltermanbc982392018-08-07 15:02:37 -0700122 v -> new Versioned<>(v.value(), v.version()))
123 .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS)));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700124 }
125
126 @Override
127 public CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet() {
128 return CompletableFuture.completedFuture(
129 new TranscodingAsyncDistributedSet<Map.Entry<K, Versioned<V>>,
130 Map.Entry<K, io.atomix.utils.time.Versioned<V>>>(
131 atomixMap.entrySet(),
132 e -> Maps.immutableEntry(e.getKey(),
133 new io.atomix.utils.time.Versioned<>(e.getValue().value(), e.getValue().version())),
134 e -> Maps.immutableEntry(e.getKey(), new Versioned<>(e.getValue().value(), e.getValue().version())))
Jordan Haltermanbc982392018-08-07 15:02:37 -0700135 .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS)));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700136 }
137
138 @Override
139 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
140 return adapt(atomixMap.putIfAbsent(key, value).thenApply(this::toVersioned));
141 }
142
143 @Override
144 public CompletableFuture<Boolean> remove(K key, V value) {
145 return adapt(atomixMap.remove(key, value));
146 }
147
148 @Override
149 public CompletableFuture<Boolean> remove(K key, long version) {
150 return adapt(atomixMap.remove(key, version));
151 }
152
153 @Override
154 public CompletableFuture<Versioned<V>> replace(K key, V value) {
155 return adapt(atomixMap.replace(key, value).thenApply(this::toVersioned));
156 }
157
158 @Override
159 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
160 return adapt(atomixMap.replace(key, oldValue, newValue));
161 }
162
163 @Override
164 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
165 return adapt(atomixMap.replace(key, oldVersion, newValue));
166 }
167
168 @Override
169 public CompletableFuture<AsyncIterator<Map.Entry<K, Versioned<V>>>> iterator() {
170 io.atomix.core.iterator.AsyncIterator<Map.Entry<K, io.atomix.utils.time.Versioned<V>>> atomixIterator
171 = atomixMap.entrySet().iterator();
172 return CompletableFuture.completedFuture(new AsyncIterator<Map.Entry<K, Versioned<V>>>() {
173 @Override
174 public CompletableFuture<Boolean> hasNext() {
175 return atomixIterator.hasNext();
176 }
177
178 @Override
179 public CompletableFuture<Map.Entry<K, Versioned<V>>> next() {
180 return atomixIterator.next()
181 .thenApply(entry -> Maps.immutableEntry(entry.getKey(), toVersioned(entry.getValue())));
182 }
183 });
184 }
185
186 @Override
187 public synchronized CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
188 io.atomix.core.map.AtomicMapEventListener<K, V> atomixListener = event ->
189 listener.event(new MapEvent<K, V>(
190 MapEvent.Type.valueOf(event.type().name()),
191 name(),
192 event.key(),
193 toVersioned(event.newValue()),
194 toVersioned(event.oldValue())));
195 listenerMap.put(listener, atomixListener);
196 return atomixMap.addListener(atomixListener, executor);
197 }
198
199 @Override
200 public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
201 io.atomix.core.map.AtomicMapEventListener<K, V> atomixListener = listenerMap.remove(listener);
202 if (atomixListener != null) {
203 return atomixMap.removeListener(atomixListener);
204 }
205 return CompletableFuture.completedFuture(null);
206 }
207
208 @Override
209 public CompletableFuture<Version> begin(TransactionId transactionId) {
210 throw new UnsupportedOperationException();
211 }
212
213 @Override
214 public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
215 throw new UnsupportedOperationException();
216 }
217
218 @Override
219 public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
220 throw new UnsupportedOperationException();
221 }
222
223 @Override
224 public CompletableFuture<Void> commit(TransactionId transactionId) {
225 throw new UnsupportedOperationException();
226 }
227
228 @Override
229 public CompletableFuture<Void> rollback(TransactionId transactionId) {
230 throw new UnsupportedOperationException();
231 }
232
233 private <T> CompletableFuture<T> adapt(CompletableFuture<T> future) {
234 CompletableFuture<T> newFuture = new CompletableFuture<>();
235 future.whenComplete((result, error) -> {
236 if (error == null) {
237 newFuture.complete(result);
238 } else {
239 Throwable cause = Throwables.getRootCause(error);
240 if (cause instanceof io.atomix.primitive.PrimitiveException.ConcurrentModification) {
241 newFuture.completeExceptionally(
242 new ConsistentMapException.ConcurrentModification(cause.getMessage()));
243 } else if (cause instanceof io.atomix.primitive.PrimitiveException.Timeout) {
244 newFuture.completeExceptionally(new ConsistentMapException.Timeout(cause.getMessage()));
245 } else if (cause instanceof io.atomix.primitive.PrimitiveException.Interrupted) {
246 newFuture.completeExceptionally(new ConsistentMapException.Interrupted());
247 } else if (cause instanceof io.atomix.primitive.PrimitiveException.Unavailable) {
248 newFuture.completeExceptionally(new ConsistentMapException.Unavailable());
249 } else if (cause instanceof io.atomix.primitive.PrimitiveException) {
250 newFuture.completeExceptionally(new ConsistentMapException(cause.getMessage()));
251 }
252 }
253 });
254 return newFuture;
255 }
256
257 private Versioned<V> toVersioned(io.atomix.utils.time.Versioned<V> versioned) {
258 return versioned != null
259 ? new Versioned<>(versioned.value(), versioned.version(), versioned.creationTime())
260 : null;
261 }
262}