Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2018-present Open Networking Foundation |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
Thomas Vachuska | b6d3167 | 2018-07-27 17:03:46 -0700 | [diff] [blame] | 16 | package org.onosproject.store.atomix.primitives.impl; |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 17 | |
Jordan Halterman | bc98239 | 2018-08-07 15:02:37 -0700 | [diff] [blame] | 18 | import java.time.Duration; |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 19 | import java.util.Collection; |
| 20 | import java.util.Map; |
| 21 | import java.util.Set; |
| 22 | import java.util.concurrent.CompletableFuture; |
| 23 | import java.util.concurrent.Executor; |
| 24 | import java.util.function.BiFunction; |
| 25 | import java.util.function.Predicate; |
| 26 | |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 27 | import com.google.common.collect.Maps; |
| 28 | import io.atomix.core.collection.impl.TranscodingAsyncDistributedCollection; |
| 29 | import io.atomix.core.set.impl.TranscodingAsyncDistributedSet; |
| 30 | import org.onosproject.store.primitives.MapUpdate; |
| 31 | import org.onosproject.store.primitives.TransactionId; |
| 32 | import org.onosproject.store.service.AsyncConsistentMap; |
| 33 | import org.onosproject.store.service.AsyncIterator; |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 34 | import org.onosproject.store.service.MapEvent; |
| 35 | import org.onosproject.store.service.MapEventListener; |
| 36 | import org.onosproject.store.service.TransactionLog; |
| 37 | import org.onosproject.store.service.Version; |
| 38 | import org.onosproject.store.service.Versioned; |
| 39 | |
Jordan Halterman | 6cf60c3 | 2018-08-15 01:22:51 -0700 | [diff] [blame] | 40 | import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptMapFuture; |
| 41 | |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 42 | /** |
| 43 | * Atomix consistent map. |
| 44 | */ |
| 45 | public class AtomixConsistentMap<K, V> implements AsyncConsistentMap<K, V> { |
| 46 | private final io.atomix.core.map.AsyncAtomicMap<K, V> atomixMap; |
| 47 | private final Map<MapEventListener<K, V>, io.atomix.core.map.AtomicMapEventListener<K, V>> listenerMap = |
| 48 | Maps.newIdentityHashMap(); |
| 49 | |
| 50 | public AtomixConsistentMap(io.atomix.core.map.AsyncAtomicMap<K, V> atomixMap) { |
| 51 | this.atomixMap = atomixMap; |
| 52 | } |
| 53 | |
| 54 | @Override |
| 55 | public String name() { |
| 56 | return atomixMap.name(); |
| 57 | } |
| 58 | |
| 59 | @Override |
| 60 | public CompletableFuture<Integer> size() { |
| 61 | return atomixMap.size(); |
| 62 | } |
| 63 | |
| 64 | @Override |
| 65 | public CompletableFuture<Boolean> containsKey(K key) { |
| 66 | return atomixMap.containsKey(key); |
| 67 | } |
| 68 | |
| 69 | @Override |
| 70 | public CompletableFuture<Boolean> containsValue(V value) { |
| 71 | return atomixMap.containsValue(value); |
| 72 | } |
| 73 | |
| 74 | @Override |
| 75 | public CompletableFuture<Versioned<V>> get(K key) { |
| 76 | return atomixMap.get(key).thenApply(this::toVersioned); |
| 77 | } |
| 78 | |
| 79 | @Override |
| 80 | public CompletableFuture<Versioned<V>> getOrDefault(K key, V defaultValue) { |
| 81 | return atomixMap.getOrDefault(key, defaultValue).thenApply(this::toVersioned); |
| 82 | } |
| 83 | |
| 84 | @Override |
| 85 | public CompletableFuture<Versioned<V>> computeIf( |
| 86 | K key, Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { |
Jordan Halterman | 6cf60c3 | 2018-08-15 01:22:51 -0700 | [diff] [blame] | 87 | return adaptMapFuture(atomixMap.computeIf(key, condition, remappingFunction).thenApply(this::toVersioned)); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 88 | } |
| 89 | |
| 90 | @Override |
| 91 | public CompletableFuture<Versioned<V>> put(K key, V value) { |
Jordan Halterman | 6cf60c3 | 2018-08-15 01:22:51 -0700 | [diff] [blame] | 92 | return adaptMapFuture(atomixMap.put(key, value).thenApply(this::toVersioned)); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 93 | } |
| 94 | |
| 95 | @Override |
| 96 | public CompletableFuture<Versioned<V>> putAndGet(K key, V value) { |
Jordan Halterman | 6cf60c3 | 2018-08-15 01:22:51 -0700 | [diff] [blame] | 97 | return adaptMapFuture(atomixMap.putAndGet(key, value).thenApply(this::toVersioned)); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 98 | } |
| 99 | |
| 100 | @Override |
| 101 | public CompletableFuture<Versioned<V>> remove(K key) { |
Jordan Halterman | 6cf60c3 | 2018-08-15 01:22:51 -0700 | [diff] [blame] | 102 | return adaptMapFuture(atomixMap.remove(key).thenApply(this::toVersioned)); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 103 | } |
| 104 | |
| 105 | @Override |
| 106 | public CompletableFuture<Void> clear() { |
| 107 | return atomixMap.clear(); |
| 108 | } |
| 109 | |
| 110 | @Override |
| 111 | public CompletableFuture<Set<K>> keySet() { |
Jordan Halterman | bc98239 | 2018-08-07 15:02:37 -0700 | [diff] [blame] | 112 | return CompletableFuture.completedFuture(atomixMap.keySet() |
| 113 | .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS))); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 114 | } |
| 115 | |
| 116 | @Override |
| 117 | public CompletableFuture<Collection<Versioned<V>>> values() { |
| 118 | return CompletableFuture.completedFuture( |
| 119 | new TranscodingAsyncDistributedCollection<Versioned<V>, io.atomix.utils.time.Versioned<V>>( |
| 120 | atomixMap.values(), |
| 121 | v -> new io.atomix.utils.time.Versioned<>(v.value(), v.version()), |
Jordan Halterman | bc98239 | 2018-08-07 15:02:37 -0700 | [diff] [blame] | 122 | v -> new Versioned<>(v.value(), v.version())) |
| 123 | .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS))); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 124 | } |
| 125 | |
| 126 | @Override |
| 127 | public CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet() { |
| 128 | return CompletableFuture.completedFuture( |
| 129 | new TranscodingAsyncDistributedSet<Map.Entry<K, Versioned<V>>, |
| 130 | Map.Entry<K, io.atomix.utils.time.Versioned<V>>>( |
| 131 | atomixMap.entrySet(), |
| 132 | e -> Maps.immutableEntry(e.getKey(), |
| 133 | new io.atomix.utils.time.Versioned<>(e.getValue().value(), e.getValue().version())), |
| 134 | e -> Maps.immutableEntry(e.getKey(), new Versioned<>(e.getValue().value(), e.getValue().version()))) |
Jordan Halterman | bc98239 | 2018-08-07 15:02:37 -0700 | [diff] [blame] | 135 | .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS))); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 136 | } |
| 137 | |
| 138 | @Override |
| 139 | public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) { |
Jordan Halterman | 6cf60c3 | 2018-08-15 01:22:51 -0700 | [diff] [blame] | 140 | return adaptMapFuture(atomixMap.putIfAbsent(key, value).thenApply(this::toVersioned)); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 141 | } |
| 142 | |
| 143 | @Override |
| 144 | public CompletableFuture<Boolean> remove(K key, V value) { |
Jordan Halterman | 6cf60c3 | 2018-08-15 01:22:51 -0700 | [diff] [blame] | 145 | return adaptMapFuture(atomixMap.remove(key, value)); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 146 | } |
| 147 | |
| 148 | @Override |
| 149 | public CompletableFuture<Boolean> remove(K key, long version) { |
Jordan Halterman | 6cf60c3 | 2018-08-15 01:22:51 -0700 | [diff] [blame] | 150 | return adaptMapFuture(atomixMap.remove(key, version)); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 151 | } |
| 152 | |
| 153 | @Override |
| 154 | public CompletableFuture<Versioned<V>> replace(K key, V value) { |
Jordan Halterman | 6cf60c3 | 2018-08-15 01:22:51 -0700 | [diff] [blame] | 155 | return adaptMapFuture(atomixMap.replace(key, value).thenApply(this::toVersioned)); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 156 | } |
| 157 | |
| 158 | @Override |
| 159 | public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) { |
Jordan Halterman | 6cf60c3 | 2018-08-15 01:22:51 -0700 | [diff] [blame] | 160 | return adaptMapFuture(atomixMap.replace(key, oldValue, newValue)); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 161 | } |
| 162 | |
| 163 | @Override |
| 164 | public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) { |
Jordan Halterman | 6cf60c3 | 2018-08-15 01:22:51 -0700 | [diff] [blame] | 165 | return adaptMapFuture(atomixMap.replace(key, oldVersion, newValue)); |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 166 | } |
| 167 | |
| 168 | @Override |
| 169 | public CompletableFuture<AsyncIterator<Map.Entry<K, Versioned<V>>>> iterator() { |
| 170 | io.atomix.core.iterator.AsyncIterator<Map.Entry<K, io.atomix.utils.time.Versioned<V>>> atomixIterator |
| 171 | = atomixMap.entrySet().iterator(); |
| 172 | return CompletableFuture.completedFuture(new AsyncIterator<Map.Entry<K, Versioned<V>>>() { |
| 173 | @Override |
| 174 | public CompletableFuture<Boolean> hasNext() { |
| 175 | return atomixIterator.hasNext(); |
| 176 | } |
| 177 | |
| 178 | @Override |
| 179 | public CompletableFuture<Map.Entry<K, Versioned<V>>> next() { |
| 180 | return atomixIterator.next() |
| 181 | .thenApply(entry -> Maps.immutableEntry(entry.getKey(), toVersioned(entry.getValue()))); |
| 182 | } |
| 183 | }); |
| 184 | } |
| 185 | |
| 186 | @Override |
| 187 | public synchronized CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) { |
| 188 | io.atomix.core.map.AtomicMapEventListener<K, V> atomixListener = event -> |
| 189 | listener.event(new MapEvent<K, V>( |
| 190 | MapEvent.Type.valueOf(event.type().name()), |
| 191 | name(), |
| 192 | event.key(), |
| 193 | toVersioned(event.newValue()), |
| 194 | toVersioned(event.oldValue()))); |
| 195 | listenerMap.put(listener, atomixListener); |
| 196 | return atomixMap.addListener(atomixListener, executor); |
| 197 | } |
| 198 | |
| 199 | @Override |
| 200 | public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) { |
| 201 | io.atomix.core.map.AtomicMapEventListener<K, V> atomixListener = listenerMap.remove(listener); |
| 202 | if (atomixListener != null) { |
| 203 | return atomixMap.removeListener(atomixListener); |
| 204 | } |
| 205 | return CompletableFuture.completedFuture(null); |
| 206 | } |
| 207 | |
| 208 | @Override |
| 209 | public CompletableFuture<Version> begin(TransactionId transactionId) { |
| 210 | throw new UnsupportedOperationException(); |
| 211 | } |
| 212 | |
| 213 | @Override |
| 214 | public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) { |
| 215 | throw new UnsupportedOperationException(); |
| 216 | } |
| 217 | |
| 218 | @Override |
| 219 | public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) { |
| 220 | throw new UnsupportedOperationException(); |
| 221 | } |
| 222 | |
| 223 | @Override |
| 224 | public CompletableFuture<Void> commit(TransactionId transactionId) { |
| 225 | throw new UnsupportedOperationException(); |
| 226 | } |
| 227 | |
| 228 | @Override |
| 229 | public CompletableFuture<Void> rollback(TransactionId transactionId) { |
| 230 | throw new UnsupportedOperationException(); |
| 231 | } |
| 232 | |
Jordan Halterman | 00e92da | 2018-05-22 23:05:52 -0700 | [diff] [blame] | 233 | private Versioned<V> toVersioned(io.atomix.utils.time.Versioned<V> versioned) { |
| 234 | return versioned != null |
| 235 | ? new Versioned<>(versioned.value(), versioned.version(), versioned.creationTime()) |
| 236 | : null; |
| 237 | } |
| 238 | } |