blob: f3a1ea2d8c45dcb66259847f1b6928dcadf0aa24 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import java.util.Collection;
19import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080020import java.util.List;
Madan Jampani0463cf92016-05-04 14:46:08 -070021import java.util.Map;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080022import java.util.Map.Entry;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070025import java.util.concurrent.ConcurrentHashMap;
Madan Jampani0463cf92016-05-04 14:46:08 -070026import java.util.concurrent.Executor;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.concurrent.atomic.AtomicReference;
28import java.util.function.BiFunction;
29import java.util.function.Predicate;
30
Jordan Halterman2bf177c2017-06-29 01:49:08 -070031import io.atomix.protocols.raft.proxy.RaftProxy;
32import org.onlab.util.KryoNamespace;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080033import org.onlab.util.Match;
Madan Jampani80d2e972016-07-12 17:11:52 -070034import org.onlab.util.Tools;
Jordan Halterman948d6592017-04-20 17:18:24 -070035import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080036import org.onosproject.store.primitives.TransactionId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070037import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
38import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
39import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UpdateAndGet;
47import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080048import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampani80d2e972016-07-12 17:11:52 -070049import org.onosproject.store.service.ConsistentMapException;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080050import org.onosproject.store.service.MapEvent;
51import org.onosproject.store.service.MapEventListener;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070052import org.onosproject.store.service.Serializer;
Jordan Halterman948d6592017-04-20 17:18:24 -070053import org.onosproject.store.service.TransactionLog;
54import org.onosproject.store.service.Version;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055import org.onosproject.store.service.Versioned;
Madan Jampani8e18c0e2016-06-20 14:08:16 -070056
Jordan Halterman2bf177c2017-06-29 01:49:08 -070057import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents.CHANGE;
58import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
59import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
61import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ENTRY_SET;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080076
77/**
78 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
79 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070080public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncConsistentMap<String, byte[]> {
81 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
82 .register(KryoNamespaces.BASIC)
83 .register(AtomixConsistentMapOperations.NAMESPACE)
84 .register(AtomixConsistentMapEvents.NAMESPACE)
85 .build());
Madan Jampani5e5b3d62016-02-01 16:03:33 -080086
Madan Jampani8e18c0e2016-06-20 14:08:16 -070087 private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080088
Jordan Halterman2bf177c2017-06-29 01:49:08 -070089 public AtomixConsistentMap(RaftProxy proxy) {
90 super(proxy);
91 proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
92 proxy.addStateChangeListener(state -> {
93 if (state == RaftProxy.State.CONNECTED && isListening()) {
94 proxy.invoke(ADD_LISTENER);
95 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -080096 });
97 }
98
Madan Jampanifc981772016-02-16 09:46:42 -080099 private void handleEvent(List<MapEvent<String, byte[]>> events) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700100 events.forEach(event ->
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700101 mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800102 }
103
104 @Override
105 public CompletableFuture<Boolean> isEmpty() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700106 return proxy.invoke(IS_EMPTY, SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800107 }
108
109 @Override
110 public CompletableFuture<Integer> size() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700111 return proxy.invoke(SIZE, SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800112 }
113
114 @Override
115 public CompletableFuture<Boolean> containsKey(String key) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700116 return proxy.invoke(CONTAINS_KEY, SERIALIZER::encode, new ContainsKey(key), SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800117 }
118
119 @Override
120 public CompletableFuture<Boolean> containsValue(byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700121 return proxy.invoke(CONTAINS_VALUE, SERIALIZER::encode, new ContainsValue(value), SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800122 }
123
124 @Override
125 public CompletableFuture<Versioned<byte[]>> get(String key) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700126 return proxy.invoke(GET, SERIALIZER::encode, new Get(key), SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800127 }
128
129 @Override
Jordan Haltermanf6272442017-04-20 02:18:08 -0700130 public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700131 return proxy.invoke(
132 GET_OR_DEFAULT,
133 SERIALIZER::encode,
134 new GetOrDefault(key, defaultValue),
135 SERIALIZER::decode);
Jordan Haltermanf6272442017-04-20 02:18:08 -0700136 }
137
138 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800139 public CompletableFuture<Set<String>> keySet() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700140 return proxy.invoke(KEY_SET, SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800141 }
142
143 @Override
144 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700145 return proxy.invoke(VALUES, SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800146 }
147
148 @Override
149 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700150 return proxy.invoke(ENTRY_SET, SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800151 }
152
153 @Override
154 @SuppressWarnings("unchecked")
155 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700156 return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
157 UPDATE_AND_GET,
158 SERIALIZER::encode,
159 new UpdateAndGet(key, value, Match.ANY, Match.ANY),
160 SERIALIZER::decode)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800161 .whenComplete((r, e) -> throwIfLocked(r.status()))
162 .thenApply(v -> v.oldValue());
163 }
164
165 @Override
166 @SuppressWarnings("unchecked")
167 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700168 return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
169 UPDATE_AND_GET,
170 SERIALIZER::encode,
171 new UpdateAndGet(key, value, Match.ANY, Match.ANY),
172 SERIALIZER::decode)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800173 .whenComplete((r, e) -> throwIfLocked(r.status()))
174 .thenApply(v -> v.newValue());
175 }
176
177 @Override
178 @SuppressWarnings("unchecked")
179 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700180 return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
181 UPDATE_AND_GET,
182 SERIALIZER::encode,
183 new UpdateAndGet(key, value, Match.NULL, Match.ANY),
184 SERIALIZER::decode)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800185 .whenComplete((r, e) -> throwIfLocked(r.status()))
186 .thenApply(v -> v.oldValue());
187 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700188
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800189 @Override
190 @SuppressWarnings("unchecked")
191 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700192 return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
193 UPDATE_AND_GET,
194 SERIALIZER::encode,
195 new UpdateAndGet(key, null, Match.ANY, Match.ANY),
196 SERIALIZER::decode)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800197 .whenComplete((r, e) -> throwIfLocked(r.status()))
198 .thenApply(v -> v.oldValue());
199 }
200
201 @Override
202 @SuppressWarnings("unchecked")
203 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700204 return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
205 UPDATE_AND_GET,
206 SERIALIZER::encode,
207 new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY),
208 SERIALIZER::decode)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800209 .whenComplete((r, e) -> throwIfLocked(r.status()))
210 .thenApply(v -> v.updated());
211 }
212
213 @Override
214 @SuppressWarnings("unchecked")
215 public CompletableFuture<Boolean> remove(String key, long version) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700216 return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
217 UPDATE_AND_GET,
218 SERIALIZER::encode,
219 new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)),
220 SERIALIZER::decode)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800221 .whenComplete((r, e) -> throwIfLocked(r.status()))
222 .thenApply(v -> v.updated());
223 }
224
225 @Override
226 @SuppressWarnings("unchecked")
227 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700228 return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
229 UPDATE_AND_GET,
230 SERIALIZER::encode,
231 new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY),
232 SERIALIZER::decode)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800233 .whenComplete((r, e) -> throwIfLocked(r.status()))
234 .thenApply(v -> v.oldValue());
235 }
236
237 @Override
238 @SuppressWarnings("unchecked")
239 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700240 return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
241 UPDATE_AND_GET,
242 SERIALIZER::encode,
243 new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY),
244 SERIALIZER::decode)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800245 .whenComplete((r, e) -> throwIfLocked(r.status()))
246 .thenApply(v -> v.updated());
247 }
248
249 @Override
250 @SuppressWarnings("unchecked")
251 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700252 return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
253 UPDATE_AND_GET,
254 SERIALIZER::encode,
255 new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)),
256 SERIALIZER::decode)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800257 .whenComplete((r, e) -> throwIfLocked(r.status()))
258 .thenApply(v -> v.updated());
259 }
260
261 @Override
262 public CompletableFuture<Void> clear() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700263 return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, SERIALIZER::decode)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800264 .whenComplete((r, e) -> throwIfLocked(r))
265 .thenApply(v -> null);
266 }
267
268 @Override
269 @SuppressWarnings("unchecked")
270 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
271 Predicate<? super byte[]> condition,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700272 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800273 return get(key).thenCompose(r1 -> {
274 byte[] existingValue = r1 == null ? null : r1.value();
275 // if the condition evaluates to false, return existing value.
276 if (!condition.test(existingValue)) {
277 return CompletableFuture.completedFuture(r1);
278 }
279
280 AtomicReference<byte[]> computedValue = new AtomicReference<>();
281 // if remappingFunction throws an exception, return the exception.
282 try {
283 computedValue.set(remappingFunction.apply(key, existingValue));
284 } catch (Exception e) {
285 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
286 future.completeExceptionally(e);
287 return future;
288 }
289 if (computedValue.get() == null && r1 == null) {
290 return CompletableFuture.completedFuture(null);
291 }
292 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
293 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700294 return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
295 UPDATE_AND_GET,
296 SERIALIZER::encode,
297 new UpdateAndGet(key,
298 computedValue.get(),
299 valueMatch,
300 versionMatch),
301 SERIALIZER::decode)
302 .whenComplete((r, e) -> throwIfLocked(r.status()))
303 .thenCompose(r -> {
304 if (r.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
305 r.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
306 return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
307 }
308 return CompletableFuture.completedFuture(r);
309 })
310 .thenApply(v -> v.newValue());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800311 });
312 }
313
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800314 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700315 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700316 Executor executor) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800317 if (mapEventListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700318 return proxy.invoke(ADD_LISTENER).thenRun(() -> mapEventListeners.put(listener, executor));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800319 } else {
Madan Jampani0463cf92016-05-04 14:46:08 -0700320 mapEventListeners.put(listener, executor);
Madan Jampani40f022e2016-03-02 21:35:14 -0800321 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800322 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800323 }
324
325 @Override
326 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700327 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700328 return proxy.invoke(REMOVE_LISTENER).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800329 }
330 return CompletableFuture.completedFuture(null);
331 }
332
333 private void throwIfLocked(MapEntryUpdateResult.Status status) {
334 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
335 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
336 }
337 }
338
Madan Jampanicadd70b2016-02-08 13:45:43 -0800339 @Override
Jordan Halterman948d6592017-04-20 17:18:24 -0700340 public CompletableFuture<Version> begin(TransactionId transactionId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700341 return proxy.<TransactionBegin, Long>invoke(
342 BEGIN,
343 SERIALIZER::encode,
344 new TransactionBegin(transactionId),
345 SERIALIZER::decode)
346 .thenApply(Version::new);
Jordan Halterman948d6592017-04-20 17:18:24 -0700347 }
348
349 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700350 public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
351 return proxy.<TransactionPrepare, PrepareResult>invoke(
352 PREPARE,
353 SERIALIZER::encode,
354 new TransactionPrepare(transactionLog),
355 SERIALIZER::decode)
Jordan Halterman948d6592017-04-20 17:18:24 -0700356 .thenApply(v -> v == PrepareResult.OK);
357 }
358
359 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700360 public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
361 return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
362 PREPARE_AND_COMMIT,
363 SERIALIZER::encode,
364 new TransactionPrepareAndCommit(transactionLog),
365 SERIALIZER::decode)
Jordan Halterman948d6592017-04-20 17:18:24 -0700366 .thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800367 }
368
369 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800370 public CompletableFuture<Void> commit(TransactionId transactionId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700371 return proxy.<TransactionCommit, CommitResult>invoke(
372 COMMIT,
373 SERIALIZER::encode,
374 new TransactionCommit(transactionId),
375 SERIALIZER::decode)
376 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800377 }
378
379 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800380 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700381 return proxy.invoke(
382 ROLLBACK,
383 SERIALIZER::encode,
384 new TransactionRollback(transactionId),
385 SERIALIZER::decode)
386 .thenApply(v -> null);
Madan Jampani1d3b6172016-04-28 13:22:57 -0700387 }
Madan Jampanifb786382016-06-13 10:25:35 -0700388
389 private boolean isListening() {
390 return !mapEventListeners.isEmpty();
391 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700392}