blob: 6241855133ae3646093b0e72c1483e0319b205b6 [file] [log] [blame]
Jordan Halterman00e92da2018-05-22 23:05:52 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuskab6d31672018-07-27 17:03:46 -070016package org.onosproject.store.atomix.primitives.impl;
Jordan Halterman00e92da2018-05-22 23:05:52 -070017
Jordan Haltermanbc982392018-08-07 15:02:37 -070018import java.time.Duration;
Jordan Halterman00e92da2018-05-22 23:05:52 -070019import java.util.Collection;
20import java.util.Map;
21import java.util.Set;
22import java.util.concurrent.CompletableFuture;
23import java.util.concurrent.Executor;
24import java.util.function.BiFunction;
25import java.util.function.Predicate;
26
Jordan Halterman00e92da2018-05-22 23:05:52 -070027import com.google.common.collect.Maps;
28import io.atomix.core.collection.impl.TranscodingAsyncDistributedCollection;
29import io.atomix.core.set.impl.TranscodingAsyncDistributedSet;
30import org.onosproject.store.primitives.MapUpdate;
31import org.onosproject.store.primitives.TransactionId;
32import org.onosproject.store.service.AsyncConsistentMap;
33import org.onosproject.store.service.AsyncIterator;
Jordan Halterman00e92da2018-05-22 23:05:52 -070034import org.onosproject.store.service.MapEvent;
35import org.onosproject.store.service.MapEventListener;
36import org.onosproject.store.service.TransactionLog;
37import org.onosproject.store.service.Version;
38import org.onosproject.store.service.Versioned;
39
Jordan Halterman6cf60c32018-08-15 01:22:51 -070040import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptMapFuture;
41
Jordan Halterman00e92da2018-05-22 23:05:52 -070042/**
43 * Atomix consistent map.
44 */
45public class AtomixConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
46 private final io.atomix.core.map.AsyncAtomicMap<K, V> atomixMap;
47 private final Map<MapEventListener<K, V>, io.atomix.core.map.AtomicMapEventListener<K, V>> listenerMap =
48 Maps.newIdentityHashMap();
49
50 public AtomixConsistentMap(io.atomix.core.map.AsyncAtomicMap<K, V> atomixMap) {
51 this.atomixMap = atomixMap;
52 }
53
54 @Override
55 public String name() {
56 return atomixMap.name();
57 }
58
59 @Override
60 public CompletableFuture<Integer> size() {
61 return atomixMap.size();
62 }
63
64 @Override
65 public CompletableFuture<Boolean> containsKey(K key) {
66 return atomixMap.containsKey(key);
67 }
68
69 @Override
70 public CompletableFuture<Boolean> containsValue(V value) {
71 return atomixMap.containsValue(value);
72 }
73
74 @Override
75 public CompletableFuture<Versioned<V>> get(K key) {
76 return atomixMap.get(key).thenApply(this::toVersioned);
77 }
78
79 @Override
80 public CompletableFuture<Versioned<V>> getOrDefault(K key, V defaultValue) {
81 return atomixMap.getOrDefault(key, defaultValue).thenApply(this::toVersioned);
82 }
83
84 @Override
85 public CompletableFuture<Versioned<V>> computeIf(
86 K key, Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070087 return adaptMapFuture(atomixMap.computeIf(key, condition, remappingFunction).thenApply(this::toVersioned));
Jordan Halterman00e92da2018-05-22 23:05:52 -070088 }
89
90 @Override
91 public CompletableFuture<Versioned<V>> put(K key, V value) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070092 return adaptMapFuture(atomixMap.put(key, value).thenApply(this::toVersioned));
Jordan Halterman00e92da2018-05-22 23:05:52 -070093 }
94
95 @Override
96 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070097 return adaptMapFuture(atomixMap.putAndGet(key, value).thenApply(this::toVersioned));
Jordan Halterman00e92da2018-05-22 23:05:52 -070098 }
99
100 @Override
101 public CompletableFuture<Versioned<V>> remove(K key) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700102 return adaptMapFuture(atomixMap.remove(key).thenApply(this::toVersioned));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700103 }
104
105 @Override
106 public CompletableFuture<Void> clear() {
107 return atomixMap.clear();
108 }
109
110 @Override
111 public CompletableFuture<Set<K>> keySet() {
Jordan Haltermanbc982392018-08-07 15:02:37 -0700112 return CompletableFuture.completedFuture(atomixMap.keySet()
113 .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS)));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700114 }
115
116 @Override
117 public CompletableFuture<Collection<Versioned<V>>> values() {
118 return CompletableFuture.completedFuture(
119 new TranscodingAsyncDistributedCollection<Versioned<V>, io.atomix.utils.time.Versioned<V>>(
120 atomixMap.values(),
121 v -> new io.atomix.utils.time.Versioned<>(v.value(), v.version()),
Jordan Haltermanbc982392018-08-07 15:02:37 -0700122 v -> new Versioned<>(v.value(), v.version()))
123 .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS)));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700124 }
125
126 @Override
127 public CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet() {
128 return CompletableFuture.completedFuture(
129 new TranscodingAsyncDistributedSet<Map.Entry<K, Versioned<V>>,
130 Map.Entry<K, io.atomix.utils.time.Versioned<V>>>(
131 atomixMap.entrySet(),
132 e -> Maps.immutableEntry(e.getKey(),
133 new io.atomix.utils.time.Versioned<>(e.getValue().value(), e.getValue().version())),
134 e -> Maps.immutableEntry(e.getKey(), new Versioned<>(e.getValue().value(), e.getValue().version())))
Jordan Haltermanbc982392018-08-07 15:02:37 -0700135 .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS)));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700136 }
137
138 @Override
139 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700140 return adaptMapFuture(atomixMap.putIfAbsent(key, value).thenApply(this::toVersioned));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700141 }
142
143 @Override
144 public CompletableFuture<Boolean> remove(K key, V value) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700145 return adaptMapFuture(atomixMap.remove(key, value));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700146 }
147
148 @Override
149 public CompletableFuture<Boolean> remove(K key, long version) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700150 return adaptMapFuture(atomixMap.remove(key, version));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700151 }
152
153 @Override
154 public CompletableFuture<Versioned<V>> replace(K key, V value) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700155 return adaptMapFuture(atomixMap.replace(key, value).thenApply(this::toVersioned));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700156 }
157
158 @Override
159 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700160 return adaptMapFuture(atomixMap.replace(key, oldValue, newValue));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700161 }
162
163 @Override
164 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700165 return adaptMapFuture(atomixMap.replace(key, oldVersion, newValue));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700166 }
167
168 @Override
169 public CompletableFuture<AsyncIterator<Map.Entry<K, Versioned<V>>>> iterator() {
170 io.atomix.core.iterator.AsyncIterator<Map.Entry<K, io.atomix.utils.time.Versioned<V>>> atomixIterator
171 = atomixMap.entrySet().iterator();
172 return CompletableFuture.completedFuture(new AsyncIterator<Map.Entry<K, Versioned<V>>>() {
173 @Override
174 public CompletableFuture<Boolean> hasNext() {
175 return atomixIterator.hasNext();
176 }
177
178 @Override
179 public CompletableFuture<Map.Entry<K, Versioned<V>>> next() {
180 return atomixIterator.next()
181 .thenApply(entry -> Maps.immutableEntry(entry.getKey(), toVersioned(entry.getValue())));
182 }
183 });
184 }
185
186 @Override
187 public synchronized CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
188 io.atomix.core.map.AtomicMapEventListener<K, V> atomixListener = event ->
189 listener.event(new MapEvent<K, V>(
190 MapEvent.Type.valueOf(event.type().name()),
191 name(),
192 event.key(),
193 toVersioned(event.newValue()),
194 toVersioned(event.oldValue())));
195 listenerMap.put(listener, atomixListener);
196 return atomixMap.addListener(atomixListener, executor);
197 }
198
199 @Override
200 public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
201 io.atomix.core.map.AtomicMapEventListener<K, V> atomixListener = listenerMap.remove(listener);
202 if (atomixListener != null) {
203 return atomixMap.removeListener(atomixListener);
204 }
205 return CompletableFuture.completedFuture(null);
206 }
207
208 @Override
209 public CompletableFuture<Version> begin(TransactionId transactionId) {
210 throw new UnsupportedOperationException();
211 }
212
213 @Override
214 public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
215 throw new UnsupportedOperationException();
216 }
217
218 @Override
219 public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
220 throw new UnsupportedOperationException();
221 }
222
223 @Override
224 public CompletableFuture<Void> commit(TransactionId transactionId) {
225 throw new UnsupportedOperationException();
226 }
227
228 @Override
229 public CompletableFuture<Void> rollback(TransactionId transactionId) {
230 throw new UnsupportedOperationException();
231 }
232
Jordan Halterman00e92da2018-05-22 23:05:52 -0700233 private Versioned<V> toVersioned(io.atomix.utils.time.Versioned<V> versioned) {
234 return versioned != null
235 ? new Versioned<>(versioned.value(), versioned.version(), versioned.creationTime())
236 : null;
237 }
238}