blob: a7bedbb950f991d19727d389925258295050a220 [file] [log] [blame]
Jordan Halterman00e92da2018-05-22 23:05:52 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.Collection;
19import java.util.Map;
20import java.util.NavigableMap;
21import java.util.NavigableSet;
22import java.util.Set;
23import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.Executor;
25import java.util.function.BiFunction;
26import java.util.function.Predicate;
27import java.util.stream.Collectors;
28
29import com.google.common.base.Throwables;
30import com.google.common.collect.Maps;
31import io.atomix.core.collection.impl.TranscodingAsyncDistributedCollection;
32import io.atomix.core.map.impl.DelegatingAsyncDistributedNavigableMap;
33import org.onosproject.store.primitives.MapUpdate;
34import org.onosproject.store.primitives.TransactionId;
35import org.onosproject.store.service.AsyncConsistentTreeMap;
36import org.onosproject.store.service.AsyncIterator;
37import org.onosproject.store.service.ConsistentMapException;
38import org.onosproject.store.service.MapEvent;
39import org.onosproject.store.service.MapEventListener;
40import org.onosproject.store.service.TransactionLog;
41import org.onosproject.store.service.Version;
42import org.onosproject.store.service.Versioned;
43
44/**
45 * Atomix consistent tree map.
46 */
47public class AtomixConsistentTreeMap<V> implements AsyncConsistentTreeMap<V> {
48 private final io.atomix.core.map.AsyncAtomicNavigableMap<String, V> atomixTreeMap;
49 private final Map<MapEventListener<String, V>, io.atomix.core.map.AtomicMapEventListener<String, V>> listenerMap =
50 Maps.newIdentityHashMap();
51
52 public AtomixConsistentTreeMap(io.atomix.core.map.AsyncAtomicNavigableMap<String, V> atomixTreeMap) {
53 this.atomixTreeMap = atomixTreeMap;
54 }
55
56 @Override
57 public String name() {
58 return atomixTreeMap.name();
59 }
60
61 @Override
62 public CompletableFuture<Integer> size() {
63 return atomixTreeMap.size();
64 }
65
66 @Override
67 public CompletableFuture<String> firstKey() {
68 return atomixTreeMap.firstKey();
69 }
70
71 @Override
72 public CompletableFuture<String> lastKey() {
73 return atomixTreeMap.lastKey();
74 }
75
76 @Override
77 public CompletableFuture<Map.Entry<String, Versioned<V>>> ceilingEntry(String key) {
78 return atomixTreeMap.ceilingEntry(key).thenApply(this::toVersioned);
79 }
80
81 @Override
82 public CompletableFuture<Map.Entry<String, Versioned<V>>> floorEntry(String key) {
83 return atomixTreeMap.floorEntry(key).thenApply(this::toVersioned);
84 }
85
86 @Override
87 public CompletableFuture<Map.Entry<String, Versioned<V>>> higherEntry(String key) {
88 return atomixTreeMap.higherEntry(key).thenApply(this::toVersioned);
89 }
90
91 @Override
92 public CompletableFuture<Map.Entry<String, Versioned<V>>> lowerEntry(String key) {
93 return atomixTreeMap.lowerEntry(key).thenApply(this::toVersioned);
94 }
95
96 @Override
97 public CompletableFuture<Map.Entry<String, Versioned<V>>> firstEntry() {
98 return atomixTreeMap.firstEntry().thenApply(this::toVersioned);
99 }
100
101 @Override
102 public CompletableFuture<Map.Entry<String, Versioned<V>>> lastEntry() {
103 return atomixTreeMap.lastEntry().thenApply(this::toVersioned);
104 }
105
106 @Override
107 public CompletableFuture<Map.Entry<String, Versioned<V>>> pollFirstEntry() {
108 throw new UnsupportedOperationException();
109 }
110
111 @Override
112 public CompletableFuture<Map.Entry<String, Versioned<V>>> pollLastEntry() {
113 throw new UnsupportedOperationException();
114 }
115
116 @Override
117 public CompletableFuture<Boolean> containsKey(String key) {
118 return atomixTreeMap.containsKey(key);
119 }
120
121 @Override
122 public CompletableFuture<String> lowerKey(String key) {
123 return atomixTreeMap.lowerKey(key);
124 }
125
126 @Override
127 public CompletableFuture<String> floorKey(String key) {
128 return atomixTreeMap.floorKey(key);
129 }
130
131 @Override
132 public CompletableFuture<String> ceilingKey(String key) {
133 return atomixTreeMap.ceilingKey(key);
134 }
135
136 @Override
137 public CompletableFuture<Versioned<V>> get(String key) {
138 return atomixTreeMap.get(key).thenApply(this::toVersioned);
139 }
140
141 @Override
142 public CompletableFuture<String> higherKey(String key) {
143 return atomixTreeMap.higherKey(key);
144 }
145
146 @Override
147 public CompletableFuture<NavigableSet<String>> navigableKeySet() {
148 return CompletableFuture.completedFuture(atomixTreeMap.navigableKeySet().sync());
149 }
150
151 @Override
152 public CompletableFuture<Versioned<V>> getOrDefault(String key, V defaultValue) {
153 return atomixTreeMap.getOrDefault(key, defaultValue).thenApply(this::toVersioned);
154 }
155
156 @Override
157 public CompletableFuture<NavigableMap<String, V>> subMap(
158 String upperKey, String lowerKey, boolean inclusiveUpper, boolean inclusiveLower) {
159 return CompletableFuture.completedFuture(
160 new DelegatingAsyncDistributedNavigableMap<>(
161 atomixTreeMap.subMap(lowerKey, inclusiveLower, upperKey, inclusiveUpper)).sync());
162 }
163
164 @Override
165 public CompletableFuture<Versioned<V>> computeIf(
166 String key,
167 Predicate<? super V> condition, BiFunction<? super String, ? super V, ? extends V> remappingFunction) {
168 return adapt(atomixTreeMap.computeIf(key, condition, remappingFunction)).thenApply(this::toVersioned);
169 }
170
171 @Override
172 public CompletableFuture<Versioned<V>> put(String key, V value) {
173 return adapt(atomixTreeMap.put(key, value)).thenApply(this::toVersioned);
174 }
175
176 @Override
177 public CompletableFuture<Versioned<V>> putAndGet(String key, V value) {
178 return adapt(atomixTreeMap.putAndGet(key, value)).thenApply(this::toVersioned);
179 }
180
181 @Override
182 public CompletableFuture<Versioned<V>> remove(String key) {
183 return adapt(atomixTreeMap.remove(key)).thenApply(this::toVersioned);
184 }
185
186 @Override
187 public CompletableFuture<Set<String>> keySet() {
188 return CompletableFuture.completedFuture(atomixTreeMap.keySet().sync());
189 }
190
191 @Override
192 public CompletableFuture<Set<Map.Entry<String, Versioned<V>>>> entrySet() {
193 return CompletableFuture.completedFuture(atomixTreeMap.entrySet().stream()
194 .map(this::toVersioned)
195 .collect(Collectors.toSet()));
196 }
197
198 @Override
199 public CompletableFuture<Versioned<V>> putIfAbsent(String key, V value) {
200 return adapt(atomixTreeMap.putIfAbsent(key, value)).thenApply(this::toVersioned);
201 }
202
203 @Override
204 public CompletableFuture<Boolean> remove(String key, V value) {
205 return adapt(atomixTreeMap.remove(key, value));
206 }
207
208 @Override
209 public CompletableFuture<Boolean> remove(String key, long version) {
210 return adapt(atomixTreeMap.remove(key, version));
211 }
212
213 @Override
214 public CompletableFuture<Versioned<V>> replace(String key, V value) {
215 return adapt(atomixTreeMap.replace(key, value)).thenApply(this::toVersioned);
216 }
217
218 @Override
219 public CompletableFuture<Boolean> replace(String key, V oldValue, V newValue) {
220 return adapt(atomixTreeMap.replace(key, oldValue, newValue));
221 }
222
223 @Override
224 public CompletableFuture<Boolean> replace(String key, long oldVersion, V newValue) {
225 return adapt(atomixTreeMap.replace(key, oldVersion, newValue));
226 }
227
228 @Override
229 public CompletableFuture<Boolean> containsValue(V value) {
230 return atomixTreeMap.containsValue(value);
231 }
232
233 @Override
234 public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V>>>> iterator() {
235 io.atomix.core.iterator.AsyncIterator<Map.Entry<String, io.atomix.utils.time.Versioned<V>>> atomixIterator
236 = atomixTreeMap.entrySet().iterator();
237 return CompletableFuture.completedFuture(new AsyncIterator<Map.Entry<String, Versioned<V>>>() {
238 @Override
239 public CompletableFuture<Boolean> hasNext() {
240 return atomixIterator.hasNext();
241 }
242
243 @Override
244 public CompletableFuture<Map.Entry<String, Versioned<V>>> next() {
245 return atomixIterator.next()
246 .thenApply(entry -> Maps.immutableEntry(entry.getKey(), toVersioned(entry.getValue())));
247 }
248 });
249 }
250
251 @Override
252 public CompletableFuture<Void> clear() {
253 return atomixTreeMap.clear();
254 }
255
256 @Override
257 public CompletableFuture<Collection<Versioned<V>>> values() {
258 return CompletableFuture.completedFuture(
259 new TranscodingAsyncDistributedCollection<Versioned<V>, io.atomix.utils.time.Versioned<V>>(
260 atomixTreeMap.values(),
261 e -> new io.atomix.utils.time.Versioned<>(e.value(), e.version()),
262 e -> new Versioned<>(e.value(), e.version())).sync());
263 }
264
265 @Override
266 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, V> listener, Executor executor) {
267 io.atomix.core.map.AtomicMapEventListener<String, V> atomixListener = event ->
268 listener.event(new MapEvent<String, V>(
269 MapEvent.Type.valueOf(event.type().name()),
270 name(),
271 event.key(),
272 toVersioned(event.newValue()),
273 toVersioned(event.oldValue())));
274 listenerMap.put(listener, atomixListener);
275 return atomixTreeMap.addListener(atomixListener, executor);
276 }
277
278 @Override
279 public CompletableFuture<Void> removeListener(MapEventListener<String, V> listener) {
280 io.atomix.core.map.AtomicMapEventListener<String, V> atomixListener = listenerMap.remove(listener);
281 if (atomixListener != null) {
282 return atomixTreeMap.removeListener(atomixListener);
283 }
284 return CompletableFuture.completedFuture(null);
285 }
286
287 @Override
288 public CompletableFuture<Version> begin(TransactionId transactionId) {
289 throw new UnsupportedOperationException();
290 }
291
292 @Override
293 public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, V>> transactionLog) {
294 throw new UnsupportedOperationException();
295 }
296
297 @Override
298 public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, V>> transactionLog) {
299 throw new UnsupportedOperationException();
300 }
301
302 @Override
303 public CompletableFuture<Void> commit(TransactionId transactionId) {
304 throw new UnsupportedOperationException();
305 }
306
307 @Override
308 public CompletableFuture<Void> rollback(TransactionId transactionId) {
309 throw new UnsupportedOperationException();
310 }
311
312 private <T> CompletableFuture<T> adapt(CompletableFuture<T> future) {
313 CompletableFuture<T> newFuture = new CompletableFuture<>();
314 future.whenComplete((result, error) -> {
315 if (error == null) {
316 newFuture.complete(result);
317 } else {
318 Throwable cause = Throwables.getRootCause(error);
319 if (cause instanceof io.atomix.primitive.PrimitiveException.ConcurrentModification) {
320 newFuture.completeExceptionally(
321 new ConsistentMapException.ConcurrentModification(error.getMessage()));
322 } else if (cause instanceof io.atomix.primitive.PrimitiveException.Timeout) {
323 newFuture.completeExceptionally(new ConsistentMapException.Timeout(error.getMessage()));
324 } else if (cause instanceof io.atomix.primitive.PrimitiveException.Interrupted) {
325 newFuture.completeExceptionally(new ConsistentMapException.Interrupted());
326 } else if (cause instanceof io.atomix.primitive.PrimitiveException.Unavailable) {
327 newFuture.completeExceptionally(new ConsistentMapException.Unavailable());
328 } else if (cause instanceof io.atomix.primitive.PrimitiveException) {
329 newFuture.completeExceptionally(new ConsistentMapException(cause.getMessage()));
330 }
331 }
332 });
333 return newFuture;
334 }
335
336 private Versioned<V> toVersioned(io.atomix.utils.time.Versioned<V> versioned) {
337 return versioned != null
338 ? new Versioned<>(versioned.value(), versioned.version(), versioned.creationTime())
339 : null;
340 }
341
342 private Map.Entry<String, Versioned<V>> toVersioned(Map.Entry<String, io.atomix.utils.time.Versioned<V>> entry) {
343 return entry != null ? Maps.immutableEntry(entry.getKey(), toVersioned(entry.getValue())) : null;
344 }
345}