blob: 051433303c3141e3e8f03bd1164bf0f313979286 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import java.util.Collection;
Jordan Haltermandae11602018-07-03 00:00:47 -070019import java.util.Collections;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import java.util.ConcurrentModificationException;
Jordan Haltermandae11602018-07-03 00:00:47 -070021import java.util.Iterator;
Madan Jampanifc981772016-02-16 09:46:42 -080022import java.util.List;
Madan Jampani0463cf92016-05-04 14:46:08 -070023import java.util.Map;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080024import java.util.Map.Entry;
Jordan Haltermandae11602018-07-03 00:00:47 -070025import java.util.NoSuchElementException;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080026import java.util.Set;
27import java.util.concurrent.CompletableFuture;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import java.util.concurrent.ConcurrentHashMap;
Madan Jampani0463cf92016-05-04 14:46:08 -070029import java.util.concurrent.Executor;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080030import java.util.function.BiFunction;
Jordan Haltermandae11602018-07-03 00:00:47 -070031import java.util.function.Function;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080032import java.util.function.Predicate;
33
Jordan Halterman2bf177c2017-06-29 01:49:08 -070034import io.atomix.protocols.raft.proxy.RaftProxy;
35import org.onlab.util.KryoNamespace;
Jordan Halterman70df7672017-08-03 16:25:19 -070036import org.onlab.util.Tools;
Jordan Halterman948d6592017-04-20 17:18:24 -070037import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080038import org.onosproject.store.primitives.TransactionId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070039import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
Jordan Halterman71635ae2017-07-28 10:35:43 -070043import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070050import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
54import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070055import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080056import org.onosproject.store.service.AsyncConsistentMap;
Jordan Haltermandae11602018-07-03 00:00:47 -070057import org.onosproject.store.service.AsyncIterator;
Jordan Halterman70df7672017-08-03 16:25:19 -070058import org.onosproject.store.service.ConsistentMapException;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059import org.onosproject.store.service.MapEvent;
60import org.onosproject.store.service.MapEventListener;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070061import org.onosproject.store.service.Serializer;
Jordan Halterman948d6592017-04-20 17:18:24 -070062import org.onosproject.store.service.TransactionLog;
63import org.onosproject.store.service.Version;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064import org.onosproject.store.service.Versioned;
Madan Jampani8e18c0e2016-06-20 14:08:16 -070065
Jordan Halterman2bf177c2017-06-29 01:49:08 -070066import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents.CHANGE;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
Jordan Haltermandae11602018-07-03 00:00:47 -070070import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLOSE_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070071import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ENTRY_SET;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
Jordan Haltermandae11602018-07-03 00:00:47 -070078import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorBatch;
79import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorPosition;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070080import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
Jordan Haltermandae11602018-07-03 00:00:47 -070081import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
82import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070083import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
84import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
Jordan Halterman71635ae2017-07-28 10:35:43 -070085import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
86import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
87import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
88import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070089import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
Jordan Halterman71635ae2017-07-28 10:35:43 -070090import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
91import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
92import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
93import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
94import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070095import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
96import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070097import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080098
99/**
100 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
101 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700102public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncConsistentMap<String, byte[]> {
103 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
Jordan Haltermandae11602018-07-03 00:00:47 -0700104 .register(KryoNamespaces.BASIC)
105 .register(AtomixConsistentMapOperations.NAMESPACE)
106 .register(AtomixConsistentMapEvents.NAMESPACE)
107 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
108 .build());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800109
Madan Jampani8e18c0e2016-06-20 14:08:16 -0700110 private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800111
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700112 public AtomixConsistentMap(RaftProxy proxy) {
113 super(proxy);
114 proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
115 proxy.addStateChangeListener(state -> {
116 if (state == RaftProxy.State.CONNECTED && isListening()) {
117 proxy.invoke(ADD_LISTENER);
118 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800119 });
120 }
121
Jordan Halterman71635ae2017-07-28 10:35:43 -0700122 protected Serializer serializer() {
123 return SERIALIZER;
124 }
125
Madan Jampanifc981772016-02-16 09:46:42 -0800126 private void handleEvent(List<MapEvent<String, byte[]>> events) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700127 events.forEach(event ->
Jordan Haltermandae11602018-07-03 00:00:47 -0700128 mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800129 }
130
131 @Override
132 public CompletableFuture<Boolean> isEmpty() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700133 return proxy.invoke(IS_EMPTY, serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800134 }
135
136 @Override
137 public CompletableFuture<Integer> size() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700138 return proxy.invoke(SIZE, serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800139 }
140
141 @Override
142 public CompletableFuture<Boolean> containsKey(String key) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700143 return proxy.invoke(CONTAINS_KEY, serializer()::encode, new ContainsKey(key), serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800144 }
145
146 @Override
147 public CompletableFuture<Boolean> containsValue(byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700148 return proxy.invoke(CONTAINS_VALUE, serializer()::encode, new ContainsValue(value), serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800149 }
150
151 @Override
152 public CompletableFuture<Versioned<byte[]>> get(String key) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700153 return proxy.invoke(GET, serializer()::encode, new Get(key), serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800154 }
155
156 @Override
Jordan Haltermanf6272442017-04-20 02:18:08 -0700157 public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700158 return proxy.invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700159 GET_OR_DEFAULT,
160 serializer()::encode,
161 new GetOrDefault(key, defaultValue),
162 serializer()::decode);
Jordan Haltermanf6272442017-04-20 02:18:08 -0700163 }
164
165 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800166 public CompletableFuture<Set<String>> keySet() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700167 return proxy.invoke(KEY_SET, serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800168 }
169
170 @Override
171 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700172 return proxy.invoke(VALUES, serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800173 }
174
175 @Override
176 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700177 return proxy.invoke(ENTRY_SET, serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800178 }
179
180 @Override
181 @SuppressWarnings("unchecked")
182 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700183 return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700184 PUT,
185 serializer()::encode,
186 new Put(key, value),
187 serializer()::decode)
188 .whenComplete((r, e) -> throwIfLocked(r))
189 .thenApply(v -> v.result());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800190 }
191
192 @Override
193 @SuppressWarnings("unchecked")
194 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700195 return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700196 PUT_AND_GET,
197 serializer()::encode,
198 new Put(key, value),
199 serializer()::decode)
200 .whenComplete((r, e) -> throwIfLocked(r))
201 .thenApply(v -> v.result());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800202 }
203
204 @Override
205 @SuppressWarnings("unchecked")
206 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700207 return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700208 PUT_IF_ABSENT,
209 serializer()::encode,
210 new Put(key, value),
211 serializer()::decode)
212 .whenComplete((r, e) -> throwIfLocked(r))
213 .thenApply(v -> v.result());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800214 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700215
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800216 @Override
217 @SuppressWarnings("unchecked")
218 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700219 return proxy.<Remove, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700220 REMOVE,
221 serializer()::encode,
222 new Remove(key),
223 serializer()::decode)
224 .whenComplete((r, e) -> throwIfLocked(r))
225 .thenApply(v -> v.result());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800226 }
227
228 @Override
229 @SuppressWarnings("unchecked")
230 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700231 return proxy.<RemoveValue, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700232 REMOVE_VALUE,
233 serializer()::encode,
234 new RemoveValue(key, value),
235 serializer()::decode)
236 .whenComplete((r, e) -> throwIfLocked(r))
237 .thenApply(v -> v.updated());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800238 }
239
240 @Override
241 @SuppressWarnings("unchecked")
242 public CompletableFuture<Boolean> remove(String key, long version) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700243 return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700244 REMOVE_VERSION,
245 serializer()::encode,
246 new RemoveVersion(key, version),
247 serializer()::decode)
248 .whenComplete((r, e) -> throwIfLocked(r))
249 .thenApply(v -> v.updated());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800250 }
251
252 @Override
253 @SuppressWarnings("unchecked")
254 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700255 return proxy.<Replace, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700256 REPLACE,
257 serializer()::encode,
258 new Replace(key, value),
259 serializer()::decode)
260 .whenComplete((r, e) -> throwIfLocked(r))
261 .thenApply(v -> v.result());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800262 }
263
264 @Override
265 @SuppressWarnings("unchecked")
266 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700267 return proxy.<ReplaceValue, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700268 REPLACE_VALUE,
269 serializer()::encode,
270 new ReplaceValue(key, oldValue, newValue),
271 serializer()::decode)
272 .whenComplete((r, e) -> throwIfLocked(r))
273 .thenApply(v -> v.updated());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800274 }
275
276 @Override
277 @SuppressWarnings("unchecked")
278 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700279 return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700280 REPLACE_VERSION,
281 serializer()::encode,
282 new ReplaceVersion(key, oldVersion, newValue),
283 serializer()::decode)
284 .whenComplete((r, e) -> throwIfLocked(r))
285 .thenApply(v -> v.updated());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800286 }
287
288 @Override
289 public CompletableFuture<Void> clear() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700290 return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, serializer()::decode)
Jordan Haltermandae11602018-07-03 00:00:47 -0700291 .whenComplete((r, e) -> throwIfLocked(r))
292 .thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800293 }
294
295 @Override
296 @SuppressWarnings("unchecked")
297 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
Jordan Haltermandae11602018-07-03 00:00:47 -0700298 Predicate<? super byte[]> condition,
299 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800300 return get(key).thenCompose(r1 -> {
301 byte[] existingValue = r1 == null ? null : r1.value();
302 // if the condition evaluates to false, return existing value.
303 if (!condition.test(existingValue)) {
304 return CompletableFuture.completedFuture(r1);
305 }
306
Jordan Halterman71635ae2017-07-28 10:35:43 -0700307 byte[] computedValue;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800308 try {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700309 computedValue = remappingFunction.apply(key, existingValue);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800310 } catch (Exception e) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700311 return Tools.exceptionalFuture(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800312 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700313
314 if (computedValue == null && r1 == null) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800315 return CompletableFuture.completedFuture(null);
316 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700317
318 if (r1 == null) {
319 return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700320 PUT_IF_ABSENT,
321 serializer()::encode,
322 new Put(key, computedValue),
323 serializer()::decode)
324 .whenComplete((r, e) -> throwIfLocked(r))
325 .thenCompose(r -> checkLocked(r))
326 .thenApply(result -> new Versioned<>(computedValue, result.version()));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700327 } else if (computedValue == null) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700328 return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700329 REMOVE_VERSION,
330 serializer()::encode,
331 new RemoveVersion(key, r1.version()),
332 serializer()::decode)
333 .whenComplete((r, e) -> throwIfLocked(r))
334 .thenCompose(r -> checkLocked(r))
335 .thenApply(v -> null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700336 } else {
337 return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700338 REPLACE_VERSION,
339 serializer()::encode,
340 new ReplaceVersion(key, r1.version(), computedValue),
341 serializer()::decode)
342 .whenComplete((r, e) -> throwIfLocked(r))
343 .thenCompose(r -> checkLocked(r))
344 .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
345 ? new Versioned(computedValue, result.version()) : result.result());
Jordan Halterman71635ae2017-07-28 10:35:43 -0700346 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800347 });
348 }
349
Jordan Halterman70df7672017-08-03 16:25:19 -0700350 private CompletableFuture<MapEntryUpdateResult<String, byte[]>> checkLocked(
Jordan Haltermandae11602018-07-03 00:00:47 -0700351 MapEntryUpdateResult<String, byte[]> result) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700352 if (result.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
Jordan Haltermandae11602018-07-03 00:00:47 -0700353 result.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700354 return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
355 }
356 return CompletableFuture.completedFuture(result);
357 }
358
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800359 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700360 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
Jordan Haltermandae11602018-07-03 00:00:47 -0700361 Executor executor) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800362 if (mapEventListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700363 return proxy.invoke(ADD_LISTENER).thenRun(() -> mapEventListeners.put(listener, executor));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800364 } else {
Madan Jampani0463cf92016-05-04 14:46:08 -0700365 mapEventListeners.put(listener, executor);
Madan Jampani40f022e2016-03-02 21:35:14 -0800366 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800367 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800368 }
369
370 @Override
371 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700372 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700373 return proxy.invoke(REMOVE_LISTENER).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800374 }
375 return CompletableFuture.completedFuture(null);
376 }
377
Jordan Halterman71635ae2017-07-28 10:35:43 -0700378 private void throwIfLocked(MapEntryUpdateResult<String, byte[]> result) {
379 if (result != null) {
380 throwIfLocked(result.status());
381 }
382 }
383
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800384 private void throwIfLocked(MapEntryUpdateResult.Status status) {
385 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
386 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
387 }
388 }
389
Madan Jampanicadd70b2016-02-08 13:45:43 -0800390 @Override
Jordan Halterman948d6592017-04-20 17:18:24 -0700391 public CompletableFuture<Version> begin(TransactionId transactionId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700392 return proxy.<TransactionBegin, Long>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700393 BEGIN,
394 serializer()::encode,
395 new TransactionBegin(transactionId),
396 serializer()::decode)
397 .thenApply(Version::new);
Jordan Halterman948d6592017-04-20 17:18:24 -0700398 }
399
400 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700401 public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
402 return proxy.<TransactionPrepare, PrepareResult>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700403 PREPARE,
404 serializer()::encode,
405 new TransactionPrepare(transactionLog),
406 serializer()::decode)
407 .thenApply(v -> v == PrepareResult.OK);
Jordan Halterman948d6592017-04-20 17:18:24 -0700408 }
409
410 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700411 public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
412 return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700413 PREPARE_AND_COMMIT,
414 serializer()::encode,
415 new TransactionPrepareAndCommit(transactionLog),
416 serializer()::decode)
417 .thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800418 }
419
420 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800421 public CompletableFuture<Void> commit(TransactionId transactionId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700422 return proxy.<TransactionCommit, CommitResult>invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700423 COMMIT,
424 serializer()::encode,
425 new TransactionCommit(transactionId),
426 serializer()::decode)
427 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800428 }
429
430 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800431 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700432 return proxy.invoke(
Jordan Haltermandae11602018-07-03 00:00:47 -0700433 ROLLBACK,
434 serializer()::encode,
435 new TransactionRollback(transactionId),
436 serializer()::decode)
437 .thenApply(v -> null);
Madan Jampani1d3b6172016-04-28 13:22:57 -0700438 }
Madan Jampanifb786382016-06-13 10:25:35 -0700439
440 private boolean isListening() {
441 return !mapEventListeners.isEmpty();
442 }
Jordan Haltermandae11602018-07-03 00:00:47 -0700443
444 @Override
445 public CompletableFuture<AsyncIterator<Entry<String, Versioned<byte[]>>>> iterator() {
446 return proxy.<Long>invoke(OPEN_ITERATOR, SERIALIZER::decode)
447 .thenApply(ConsistentMultimapIterator::new);
448 }
449
450 /**
451 * Consistent multimap iterator.
452 */
453 private class ConsistentMultimapIterator implements AsyncIterator<Map.Entry<String, Versioned<byte[]>>> {
454 private final long id;
455 private volatile CompletableFuture<IteratorBatch> batch;
456 private volatile CompletableFuture<Void> closeFuture;
457
458 ConsistentMultimapIterator(long id) {
459 this.id = id;
460 this.batch = CompletableFuture.completedFuture(
461 new IteratorBatch(0, Collections.emptyList()));
462 }
463
464 /**
465 * Returns the current batch iterator or lazily fetches the next batch from the cluster.
466 *
467 * @return the next batch iterator
468 */
469 private CompletableFuture<Iterator<Entry<String, Versioned<byte[]>>>> batch() {
470 return batch.thenCompose(iterator -> {
471 if (iterator != null && !iterator.hasNext()) {
472 batch = fetch(iterator.position());
473 return batch.thenApply(Function.identity());
474 }
475 return CompletableFuture.completedFuture(iterator);
476 });
477 }
478
479 /**
480 * Fetches the next batch of entries from the cluster.
481 *
482 * @param position the position from which to fetch the next batch
483 * @return the next batch of entries from the cluster
484 */
485 private CompletableFuture<IteratorBatch> fetch(int position) {
486 return proxy.<IteratorPosition, IteratorBatch>invoke(
487 NEXT,
488 SERIALIZER::encode,
489 new IteratorPosition(id, position),
490 SERIALIZER::decode)
491 .thenCompose(batch -> {
492 if (batch == null) {
493 return close().thenApply(v -> null);
494 }
495 return CompletableFuture.completedFuture(batch);
496 });
497 }
498
499 /**
500 * Closes the iterator.
501 *
502 * @return future to be completed once the iterator has been closed
503 */
504 private CompletableFuture<Void> close() {
505 if (closeFuture == null) {
506 synchronized (this) {
507 if (closeFuture == null) {
508 closeFuture = proxy.invoke(CLOSE_ITERATOR, SERIALIZER::encode, id);
509 }
510 }
511 }
512 return closeFuture;
513 }
514
515 @Override
516 public CompletableFuture<Boolean> hasNext() {
517 return batch().thenApply(iterator -> iterator != null && iterator.hasNext());
518 }
519
520 @Override
521 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> next() {
522 return batch().thenCompose(iterator -> {
523 if (iterator == null) {
524 return Tools.exceptionalFuture(new NoSuchElementException());
525 }
526 return CompletableFuture.completedFuture(iterator.next());
527 });
528 }
529 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700530}