blob: ad3bbb63f54014fff0b9ba578a86b85e56a3f776 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import java.util.Collection;
19import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080020import java.util.List;
Madan Jampani0463cf92016-05-04 14:46:08 -070021import java.util.Map;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080022import java.util.Map.Entry;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070025import java.util.concurrent.ConcurrentHashMap;
Madan Jampani0463cf92016-05-04 14:46:08 -070026import java.util.concurrent.Executor;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.function.BiFunction;
28import java.util.function.Predicate;
29
Jordan Halterman2bf177c2017-06-29 01:49:08 -070030import io.atomix.protocols.raft.proxy.RaftProxy;
31import org.onlab.util.KryoNamespace;
Jordan Halterman70df7672017-08-03 16:25:19 -070032import org.onlab.util.Tools;
Jordan Halterman948d6592017-04-20 17:18:24 -070033import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080034import org.onosproject.store.primitives.TransactionId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070035import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
36import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
37import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
38import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
Jordan Halterman71635ae2017-07-28 10:35:43 -070039import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070046import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070051import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080052import org.onosproject.store.service.AsyncConsistentMap;
Jordan Halterman70df7672017-08-03 16:25:19 -070053import org.onosproject.store.service.ConsistentMapException;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054import org.onosproject.store.service.MapEvent;
55import org.onosproject.store.service.MapEventListener;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070056import org.onosproject.store.service.Serializer;
Jordan Halterman948d6592017-04-20 17:18:24 -070057import org.onosproject.store.service.TransactionLog;
58import org.onosproject.store.service.Version;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059import org.onosproject.store.service.Versioned;
Madan Jampani8e18c0e2016-06-20 14:08:16 -070060
Jordan Halterman2bf177c2017-06-29 01:49:08 -070061import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents.CHANGE;
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ENTRY_SET;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
Jordan Halterman71635ae2017-07-28 10:35:43 -070075import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
78import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070079import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
Jordan Halterman71635ae2017-07-28 10:35:43 -070080import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
81import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
82import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
83import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
84import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070085import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
86import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070087import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080088
89/**
90 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
91 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070092public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncConsistentMap<String, byte[]> {
93 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
94 .register(KryoNamespaces.BASIC)
95 .register(AtomixConsistentMapOperations.NAMESPACE)
96 .register(AtomixConsistentMapEvents.NAMESPACE)
Jordan Halterman71635ae2017-07-28 10:35:43 -070097 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
Jordan Halterman2bf177c2017-06-29 01:49:08 -070098 .build());
Madan Jampani5e5b3d62016-02-01 16:03:33 -080099
Madan Jampani8e18c0e2016-06-20 14:08:16 -0700100 private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800101
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700102 public AtomixConsistentMap(RaftProxy proxy) {
103 super(proxy);
104 proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
105 proxy.addStateChangeListener(state -> {
106 if (state == RaftProxy.State.CONNECTED && isListening()) {
107 proxy.invoke(ADD_LISTENER);
108 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800109 });
110 }
111
Jordan Halterman71635ae2017-07-28 10:35:43 -0700112 protected Serializer serializer() {
113 return SERIALIZER;
114 }
115
Madan Jampanifc981772016-02-16 09:46:42 -0800116 private void handleEvent(List<MapEvent<String, byte[]>> events) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700117 events.forEach(event ->
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700118 mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800119 }
120
121 @Override
122 public CompletableFuture<Boolean> isEmpty() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700123 return proxy.invoke(IS_EMPTY, serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800124 }
125
126 @Override
127 public CompletableFuture<Integer> size() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700128 return proxy.invoke(SIZE, serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800129 }
130
131 @Override
132 public CompletableFuture<Boolean> containsKey(String key) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700133 return proxy.invoke(CONTAINS_KEY, serializer()::encode, new ContainsKey(key), serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800134 }
135
136 @Override
137 public CompletableFuture<Boolean> containsValue(byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700138 return proxy.invoke(CONTAINS_VALUE, serializer()::encode, new ContainsValue(value), serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800139 }
140
141 @Override
142 public CompletableFuture<Versioned<byte[]>> get(String key) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700143 return proxy.invoke(GET, serializer()::encode, new Get(key), serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800144 }
145
146 @Override
Jordan Haltermanf6272442017-04-20 02:18:08 -0700147 public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700148 return proxy.invoke(
149 GET_OR_DEFAULT,
Jordan Halterman71635ae2017-07-28 10:35:43 -0700150 serializer()::encode,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700151 new GetOrDefault(key, defaultValue),
Jordan Halterman71635ae2017-07-28 10:35:43 -0700152 serializer()::decode);
Jordan Haltermanf6272442017-04-20 02:18:08 -0700153 }
154
155 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800156 public CompletableFuture<Set<String>> keySet() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700157 return proxy.invoke(KEY_SET, serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800158 }
159
160 @Override
161 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700162 return proxy.invoke(VALUES, serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800163 }
164
165 @Override
166 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700167 return proxy.invoke(ENTRY_SET, serializer()::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800168 }
169
170 @Override
171 @SuppressWarnings("unchecked")
172 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700173 return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
174 PUT,
175 serializer()::encode,
176 new Put(key, value),
177 serializer()::decode)
178 .whenComplete((r, e) -> throwIfLocked(r))
179 .thenApply(v -> v.result());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800180 }
181
182 @Override
183 @SuppressWarnings("unchecked")
184 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700185 return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
186 PUT_AND_GET,
187 serializer()::encode,
188 new Put(key, value),
189 serializer()::decode)
190 .whenComplete((r, e) -> throwIfLocked(r))
191 .thenApply(v -> v.result());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800192 }
193
194 @Override
195 @SuppressWarnings("unchecked")
196 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700197 return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
198 PUT_IF_ABSENT,
199 serializer()::encode,
200 new Put(key, value),
201 serializer()::decode)
202 .whenComplete((r, e) -> throwIfLocked(r))
203 .thenApply(v -> v.result());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800204 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700205
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800206 @Override
207 @SuppressWarnings("unchecked")
208 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700209 return proxy.<Remove, MapEntryUpdateResult<String, byte[]>>invoke(
210 REMOVE,
211 serializer()::encode,
212 new Remove(key),
213 serializer()::decode)
214 .whenComplete((r, e) -> throwIfLocked(r))
215 .thenApply(v -> v.result());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800216 }
217
218 @Override
219 @SuppressWarnings("unchecked")
220 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700221 return proxy.<RemoveValue, MapEntryUpdateResult<String, byte[]>>invoke(
222 REMOVE_VALUE,
223 serializer()::encode,
224 new RemoveValue(key, value),
225 serializer()::decode)
226 .whenComplete((r, e) -> throwIfLocked(r))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800227 .thenApply(v -> v.updated());
228 }
229
230 @Override
231 @SuppressWarnings("unchecked")
232 public CompletableFuture<Boolean> remove(String key, long version) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700233 return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
234 REMOVE_VERSION,
235 serializer()::encode,
236 new RemoveVersion(key, version),
237 serializer()::decode)
238 .whenComplete((r, e) -> throwIfLocked(r))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800239 .thenApply(v -> v.updated());
240 }
241
242 @Override
243 @SuppressWarnings("unchecked")
244 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700245 return proxy.<Replace, MapEntryUpdateResult<String, byte[]>>invoke(
246 REPLACE,
247 serializer()::encode,
248 new Replace(key, value),
249 serializer()::decode)
250 .whenComplete((r, e) -> throwIfLocked(r))
251 .thenApply(v -> v.result());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800252 }
253
254 @Override
255 @SuppressWarnings("unchecked")
256 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700257 return proxy.<ReplaceValue, MapEntryUpdateResult<String, byte[]>>invoke(
258 REPLACE_VALUE,
259 serializer()::encode,
260 new ReplaceValue(key, oldValue, newValue),
261 serializer()::decode)
262 .whenComplete((r, e) -> throwIfLocked(r))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800263 .thenApply(v -> v.updated());
264 }
265
266 @Override
267 @SuppressWarnings("unchecked")
268 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700269 return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
270 REPLACE_VERSION,
271 serializer()::encode,
272 new ReplaceVersion(key, oldVersion, newValue),
273 serializer()::decode)
274 .whenComplete((r, e) -> throwIfLocked(r))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800275 .thenApply(v -> v.updated());
276 }
277
278 @Override
279 public CompletableFuture<Void> clear() {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700280 return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, serializer()::decode)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800281 .whenComplete((r, e) -> throwIfLocked(r))
282 .thenApply(v -> null);
283 }
284
285 @Override
286 @SuppressWarnings("unchecked")
287 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
288 Predicate<? super byte[]> condition,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700289 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800290 return get(key).thenCompose(r1 -> {
291 byte[] existingValue = r1 == null ? null : r1.value();
292 // if the condition evaluates to false, return existing value.
293 if (!condition.test(existingValue)) {
294 return CompletableFuture.completedFuture(r1);
295 }
296
Jordan Halterman71635ae2017-07-28 10:35:43 -0700297 byte[] computedValue;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800298 try {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700299 computedValue = remappingFunction.apply(key, existingValue);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800300 } catch (Exception e) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700301 return Tools.exceptionalFuture(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800302 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700303
304 if (computedValue == null && r1 == null) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800305 return CompletableFuture.completedFuture(null);
306 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700307
308 if (r1 == null) {
309 return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
310 PUT_IF_ABSENT,
311 serializer()::encode,
312 new Put(key, computedValue),
313 serializer()::decode)
314 .whenComplete((r, e) -> throwIfLocked(r))
Jordan Halterman70df7672017-08-03 16:25:19 -0700315 .thenCompose(r -> checkLocked(r))
Jordan Halterman71635ae2017-07-28 10:35:43 -0700316 .thenApply(result -> new Versioned<>(computedValue, result.version()));
317 } else if (computedValue == null) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700318 return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
319 REMOVE_VERSION,
320 serializer()::encode,
321 new RemoveVersion(key, r1.version()),
322 serializer()::decode)
323 .whenComplete((r, e) -> throwIfLocked(r))
324 .thenCompose(r -> checkLocked(r))
325 .thenApply(v -> null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700326 } else {
327 return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
328 REPLACE_VERSION,
329 serializer()::encode,
330 new ReplaceVersion(key, r1.version(), computedValue),
331 serializer()::decode)
332 .whenComplete((r, e) -> throwIfLocked(r))
Jordan Halterman70df7672017-08-03 16:25:19 -0700333 .thenCompose(r -> checkLocked(r))
Jordan Halterman71635ae2017-07-28 10:35:43 -0700334 .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
335 ? new Versioned(computedValue, result.version()) : result.result());
336 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800337 });
338 }
339
Jordan Halterman70df7672017-08-03 16:25:19 -0700340 private CompletableFuture<MapEntryUpdateResult<String, byte[]>> checkLocked(
341 MapEntryUpdateResult<String, byte[]> result) {
342 if (result.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
343 result.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
344 return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
345 }
346 return CompletableFuture.completedFuture(result);
347 }
348
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800349 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700350 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700351 Executor executor) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800352 if (mapEventListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700353 return proxy.invoke(ADD_LISTENER).thenRun(() -> mapEventListeners.put(listener, executor));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800354 } else {
Madan Jampani0463cf92016-05-04 14:46:08 -0700355 mapEventListeners.put(listener, executor);
Madan Jampani40f022e2016-03-02 21:35:14 -0800356 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800357 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800358 }
359
360 @Override
361 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700362 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700363 return proxy.invoke(REMOVE_LISTENER).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800364 }
365 return CompletableFuture.completedFuture(null);
366 }
367
Jordan Halterman71635ae2017-07-28 10:35:43 -0700368 private void throwIfLocked(MapEntryUpdateResult<String, byte[]> result) {
369 if (result != null) {
370 throwIfLocked(result.status());
371 }
372 }
373
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800374 private void throwIfLocked(MapEntryUpdateResult.Status status) {
375 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
376 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
377 }
378 }
379
Madan Jampanicadd70b2016-02-08 13:45:43 -0800380 @Override
Jordan Halterman948d6592017-04-20 17:18:24 -0700381 public CompletableFuture<Version> begin(TransactionId transactionId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700382 return proxy.<TransactionBegin, Long>invoke(
383 BEGIN,
Jordan Halterman71635ae2017-07-28 10:35:43 -0700384 serializer()::encode,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700385 new TransactionBegin(transactionId),
Jordan Halterman71635ae2017-07-28 10:35:43 -0700386 serializer()::decode)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700387 .thenApply(Version::new);
Jordan Halterman948d6592017-04-20 17:18:24 -0700388 }
389
390 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700391 public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
392 return proxy.<TransactionPrepare, PrepareResult>invoke(
393 PREPARE,
Jordan Halterman71635ae2017-07-28 10:35:43 -0700394 serializer()::encode,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700395 new TransactionPrepare(transactionLog),
Jordan Halterman71635ae2017-07-28 10:35:43 -0700396 serializer()::decode)
Jordan Halterman948d6592017-04-20 17:18:24 -0700397 .thenApply(v -> v == PrepareResult.OK);
398 }
399
400 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700401 public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
402 return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
403 PREPARE_AND_COMMIT,
Jordan Halterman71635ae2017-07-28 10:35:43 -0700404 serializer()::encode,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700405 new TransactionPrepareAndCommit(transactionLog),
Jordan Halterman71635ae2017-07-28 10:35:43 -0700406 serializer()::decode)
Jordan Halterman948d6592017-04-20 17:18:24 -0700407 .thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800408 }
409
410 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800411 public CompletableFuture<Void> commit(TransactionId transactionId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700412 return proxy.<TransactionCommit, CommitResult>invoke(
413 COMMIT,
Jordan Halterman71635ae2017-07-28 10:35:43 -0700414 serializer()::encode,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700415 new TransactionCommit(transactionId),
Jordan Halterman71635ae2017-07-28 10:35:43 -0700416 serializer()::decode)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700417 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800418 }
419
420 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800421 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700422 return proxy.invoke(
423 ROLLBACK,
Jordan Halterman71635ae2017-07-28 10:35:43 -0700424 serializer()::encode,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700425 new TransactionRollback(transactionId),
Jordan Halterman71635ae2017-07-28 10:35:43 -0700426 serializer()::decode)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700427 .thenApply(v -> null);
Madan Jampani1d3b6172016-04-28 13:22:57 -0700428 }
Madan Jampanifb786382016-06-13 10:25:35 -0700429
430 private boolean isListening() {
431 return !mapEventListeners.isEmpty();
432 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700433}