blob: d25d4752c9536786d938784d4093e75c5fae3190 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Jordan Halterman71635ae2017-07-28 10:35:43 -070018import java.util.Arrays;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070019import java.util.Collection;
20import java.util.HashMap;
21import java.util.Iterator;
22import java.util.LinkedHashMap;
23import java.util.List;
24import java.util.Map;
25import java.util.Set;
Jordan Halterman71635ae2017-07-28 10:35:43 -070026import java.util.function.Predicate;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070027import java.util.stream.Collectors;
28
Jordan Halterman2bf177c2017-06-29 01:49:08 -070029import com.google.common.collect.Lists;
30import com.google.common.collect.Maps;
31import com.google.common.collect.Sets;
32import io.atomix.protocols.raft.service.AbstractRaftService;
33import io.atomix.protocols.raft.service.Commit;
34import io.atomix.protocols.raft.service.RaftServiceExecutor;
35import io.atomix.protocols.raft.session.RaftSession;
36import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
37import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
38import org.onlab.util.KryoNamespace;
39import org.onlab.util.Match;
40import org.onosproject.store.primitives.MapUpdate;
41import org.onosproject.store.primitives.TransactionId;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
Jordan Halterman71635ae2017-07-28 10:35:43 -070046import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070053import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
54import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
55import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
56import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
57import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070058import org.onosproject.store.serializers.KryoNamespaces;
59import org.onosproject.store.service.MapEvent;
60import org.onosproject.store.service.Serializer;
61import org.onosproject.store.service.TransactionLog;
62import org.onosproject.store.service.Versioned;
63
64import static com.google.common.base.Preconditions.checkState;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents.CHANGE;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ENTRY_SET;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
78import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
Jordan Halterman71635ae2017-07-28 10:35:43 -070079import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
80import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
81import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
82import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070083import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
Jordan Halterman71635ae2017-07-28 10:35:43 -070084import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
85import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
86import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
87import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
88import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070089import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
90import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070091import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070092
93/**
94 * State Machine for {@link AtomixConsistentMap} resource.
95 */
96public class AtomixConsistentMapService extends AbstractRaftService {
97
98 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
99 .register(KryoNamespaces.BASIC)
100 .register(AtomixConsistentMapOperations.NAMESPACE)
101 .register(AtomixConsistentMapEvents.NAMESPACE)
Jordan Halterman71635ae2017-07-28 10:35:43 -0700102 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700103 .register(TransactionScope.class)
104 .register(TransactionLog.class)
105 .register(TransactionId.class)
106 .register(MapEntryValue.class)
107 .register(MapEntryValue.Type.class)
108 .register(new HashMap().keySet().getClass())
109 .build());
110
Jordan Halterman71635ae2017-07-28 10:35:43 -0700111 protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
112 private Map<String, MapEntryValue> map;
113 protected Set<String> preparedKeys = Sets.newHashSet();
114 protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
115 protected long currentVersion;
116
117 public AtomixConsistentMapService() {
118 map = createMap();
119 }
120
121 protected Map<String, MapEntryValue> createMap() {
122 return Maps.newHashMap();
123 }
124
125 protected Map<String, MapEntryValue> entries() {
126 return map;
127 }
128
129 protected Serializer serializer() {
130 return SERIALIZER;
131 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700132
133 @Override
134 public void snapshot(SnapshotWriter writer) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700135 writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer()::encode);
136 writer.writeObject(preparedKeys, serializer()::encode);
137 writer.writeObject(entries(), serializer()::encode);
138 writer.writeObject(activeTransactions, serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700139 writer.writeLong(currentVersion);
140 }
141
142 @Override
143 public void install(SnapshotReader reader) {
144 listeners = new LinkedHashMap<>();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700145 for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700146 listeners.put(sessionId, sessions().getSession(sessionId));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700147 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700148 preparedKeys = reader.readObject(serializer()::decode);
149 map = reader.readObject(serializer()::decode);
150 activeTransactions = reader.readObject(serializer()::decode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700151 currentVersion = reader.readLong();
152 }
153
154 @Override
155 protected void configure(RaftServiceExecutor executor) {
156 // Listeners
Jordan Halterman71635ae2017-07-28 10:35:43 -0700157 executor.register(ADD_LISTENER, (Commit<Void> c) -> listen(c.session()));
158 executor.register(REMOVE_LISTENER, (Commit<Void> c) -> unlisten(c.session()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700159 // Queries
Jordan Halterman71635ae2017-07-28 10:35:43 -0700160 executor.register(CONTAINS_KEY, serializer()::decode, this::containsKey, serializer()::encode);
161 executor.register(CONTAINS_VALUE, serializer()::decode, this::containsValue, serializer()::encode);
162 executor.register(ENTRY_SET, (Commit<Void> c) -> entrySet(), serializer()::encode);
163 executor.register(GET, serializer()::decode, this::get, serializer()::encode);
164 executor.register(GET_OR_DEFAULT, serializer()::decode, this::getOrDefault, serializer()::encode);
165 executor.register(IS_EMPTY, (Commit<Void> c) -> isEmpty(), serializer()::encode);
166 executor.register(KEY_SET, (Commit<Void> c) -> keySet(), serializer()::encode);
167 executor.register(SIZE, (Commit<Void> c) -> size(), serializer()::encode);
168 executor.register(VALUES, (Commit<Void> c) -> values(), serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700169 // Commands
Jordan Halterman71635ae2017-07-28 10:35:43 -0700170 executor.register(PUT, serializer()::decode, this::put, serializer()::encode);
171 executor.register(PUT_IF_ABSENT, serializer()::decode, this::putIfAbsent, serializer()::encode);
172 executor.register(PUT_AND_GET, serializer()::decode, this::putAndGet, serializer()::encode);
173 executor.register(REMOVE, serializer()::decode, this::remove, serializer()::encode);
174 executor.register(REMOVE_VALUE, serializer()::decode, this::removeValue, serializer()::encode);
175 executor.register(REMOVE_VERSION, serializer()::decode, this::removeVersion, serializer()::encode);
176 executor.register(REPLACE, serializer()::decode, this::replace, serializer()::encode);
177 executor.register(REPLACE_VALUE, serializer()::decode, this::replaceValue, serializer()::encode);
178 executor.register(REPLACE_VERSION, serializer()::decode, this::replaceVersion, serializer()::encode);
179 executor.register(CLEAR, (Commit<Void> c) -> clear(), serializer()::encode);
180 executor.register(BEGIN, serializer()::decode, this::begin, serializer()::encode);
181 executor.register(PREPARE, serializer()::decode, this::prepare, serializer()::encode);
182 executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
183 executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
184 executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700185 }
186
187 /**
188 * Handles a contains key commit.
189 *
190 * @param commit containsKey commit
191 * @return {@code true} if map contains key
192 */
193 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700194 MapEntryValue value = entries().get(commit.value().key());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700195 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
196 }
197
198 /**
199 * Handles a contains value commit.
200 *
201 * @param commit containsValue commit
202 * @return {@code true} if map contains value
203 */
204 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
205 Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
Jordan Halterman71635ae2017-07-28 10:35:43 -0700206 return entries().values().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700207 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
208 .anyMatch(value -> valueMatch.matches(value.value()));
209 }
210
211 /**
212 * Handles a get commit.
213 *
214 * @param commit get commit
215 * @return value mapped to key
216 */
217 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700218 return toVersioned(entries().get(commit.value().key()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700219 }
220
221 /**
222 * Handles a get or default commit.
223 *
224 * @param commit get or default commit
225 * @return value mapped to key
226 */
227 protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700228 MapEntryValue value = entries().get(commit.value().key());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700229 if (value == null) {
230 return new Versioned<>(commit.value().defaultValue(), 0);
231 } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
232 return new Versioned<>(commit.value().defaultValue(), value.version);
233 } else {
234 return new Versioned<>(value.value(), value.version);
235 }
236 }
237
238 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700239 * Handles a size commit.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700240 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700241 * @return number of entries in map
242 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700243 protected int size() {
244 return (int) entries().values().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700245 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
246 .count();
247 }
248
249 /**
250 * Handles an is empty commit.
251 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700252 * @return {@code true} if map is empty
253 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700254 protected boolean isEmpty() {
255 return entries().values().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700256 .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
257 }
258
259 /**
260 * Handles a keySet commit.
261 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700262 * @return set of keys in map
263 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700264 protected Set<String> keySet() {
265 return entries().entrySet().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700266 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
267 .map(Map.Entry::getKey)
268 .collect(Collectors.toSet());
269 }
270
271 /**
272 * Handles a values commit.
273 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700274 * @return collection of values in map
275 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700276 protected Collection<Versioned<byte[]>> values() {
277 return entries().entrySet().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700278 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
279 .map(entry -> toVersioned(entry.getValue()))
280 .collect(Collectors.toList());
281 }
282
283 /**
284 * Handles a entry set commit.
285 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700286 * @return set of map entries
287 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700288 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
289 return entries().entrySet().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700290 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
291 .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
292 .collect(Collectors.toSet());
293 }
294
295 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700296 * Returns a boolean indicating whether the given MapEntryValues are equal.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700297 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700298 * @param oldValue the first value to compare
299 * @param newValue the second value to compare
300 * @return indicates whether the two values are equal
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700301 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700302 protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
303 return (oldValue == null && newValue == null)
304 || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
305 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700306
Jordan Halterman71635ae2017-07-28 10:35:43 -0700307 /**
308 * Returns a boolean indicating whether the given entry values are equal.
309 *
310 * @param oldValue the first value to compare
311 * @param newValue the second value to compare
312 * @return indicates whether the two values are equal
313 */
314 protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
315 return (oldValue == null && newValue == null)
316 || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
317 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700318
Jordan Halterman71635ae2017-07-28 10:35:43 -0700319 /**
320 * Returns a boolean indicating whether the given MapEntryValue is null or a tombstone.
321 *
322 * @param value the value to check
323 * @return indicates whether the given value is null or is a tombstone
324 */
325 protected boolean valueIsNull(MapEntryValue value) {
326 return value == null || value.type() == MapEntryValue.Type.TOMBSTONE;
327 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700328
Jordan Halterman71635ae2017-07-28 10:35:43 -0700329 /**
330 * Handles a put commit.
331 *
332 * @param commit put commit
333 * @return map entry update result
334 */
335 protected MapEntryUpdateResult<String, byte[]> put(Commit<? extends Put> commit) {
336 String key = commit.value().key();
337 MapEntryValue oldValue = entries().get(key);
338 MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
Jordan Halterman70df7672017-08-03 16:25:19 -0700339
340 // If the value is null or a tombstone, this is an insert.
341 // Otherwise, only update the value if it has changed to reduce the number of events.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700342 if (valueIsNull(oldValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700343 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700344 if (preparedKeys.contains(key)) {
345 return new MapEntryUpdateResult<>(
346 MapEntryUpdateResult.Status.WRITE_LOCK,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700347 commit.index(),
Jordan Halterman71635ae2017-07-28 10:35:43 -0700348 key,
349 toVersioned(oldValue));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700350 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700351 entries().put(commit.value().key(),
352 new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
353 Versioned<byte[]> result = toVersioned(oldValue);
354 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
355 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
356 } else if (!valuesEqual(oldValue, newValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700357 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700358 if (preparedKeys.contains(key)) {
359 return new MapEntryUpdateResult<>(
360 MapEntryUpdateResult.Status.WRITE_LOCK,
361 commit.index(),
362 key,
363 toVersioned(oldValue));
364 }
365 entries().put(commit.value().key(),
366 new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
367 Versioned<byte[]> result = toVersioned(oldValue);
368 publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
369 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700370 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700371 // If the value hasn't changed, return a NOOP result.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700372 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
373 }
374
375 /**
376 * Handles a putIfAbsent commit.
377 *
378 * @param commit putIfAbsent commit
379 * @return map entry update result
380 */
381 protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
382 String key = commit.value().key();
383 MapEntryValue oldValue = entries().get(key);
Jordan Halterman70df7672017-08-03 16:25:19 -0700384
385 // If the value is null, this is an INSERT.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700386 if (valueIsNull(oldValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700387 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700388 if (preparedKeys.contains(key)) {
389 return new MapEntryUpdateResult<>(
390 MapEntryUpdateResult.Status.WRITE_LOCK,
391 commit.index(),
392 key,
393 toVersioned(oldValue));
394 }
395 MapEntryValue newValue = new MapEntryValue(
396 MapEntryValue.Type.VALUE,
397 commit.index(),
398 commit.value().value());
Jordan Halterman70df7672017-08-03 16:25:19 -0700399 entries().put(commit.value().key(), newValue);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700400 Versioned<byte[]> result = toVersioned(newValue);
401 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
402 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
403 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700404 return new MapEntryUpdateResult<>(
405 MapEntryUpdateResult.Status.PRECONDITION_FAILED,
406 commit.index(),
407 key,
408 toVersioned(oldValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700409 }
410
411 /**
412 * Handles a putAndGet commit.
413 *
414 * @param commit putAndGet commit
415 * @return map entry update result
416 */
417 protected MapEntryUpdateResult<String, byte[]> putAndGet(Commit<? extends Put> commit) {
418 String key = commit.value().key();
419 MapEntryValue oldValue = entries().get(key);
420 MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
Jordan Halterman70df7672017-08-03 16:25:19 -0700421
422 // If the value is null or a tombstone, this is an insert.
423 // Otherwise, only update the value if it has changed to reduce the number of events.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700424 if (valueIsNull(oldValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700425 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700426 if (preparedKeys.contains(key)) {
427 return new MapEntryUpdateResult<>(
428 MapEntryUpdateResult.Status.WRITE_LOCK,
429 commit.index(),
430 key,
431 toVersioned(oldValue));
432 }
433 entries().put(commit.value().key(), newValue);
434 Versioned<byte[]> result = toVersioned(newValue);
435 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
436 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
437 } else if (!valuesEqual(oldValue, newValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700438 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700439 if (preparedKeys.contains(key)) {
440 return new MapEntryUpdateResult<>(
441 MapEntryUpdateResult.Status.WRITE_LOCK,
442 commit.index(),
443 key,
444 toVersioned(oldValue));
445 }
446 entries().put(commit.value().key(), newValue);
447 Versioned<byte[]> result = toVersioned(newValue);
448 publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, result, toVersioned(oldValue)));
449 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
450 }
451 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
452 }
453
454 /**
455 * Handles a remove commit.
456 *
457 * @param index the commit index
458 * @param key the key to remove
459 * @param predicate predicate to determine whether to remove the entry
460 * @return map entry update result
461 */
462 private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
463 MapEntryValue value = entries().get(key);
Jordan Halterman70df7672017-08-03 16:25:19 -0700464
465 // If the value does not exist or doesn't match the predicate, return a PRECONDITION_FAILED error.
466 if (valueIsNull(value) || !predicate.test(value)) {
467 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.PRECONDITION_FAILED, index, key, null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700468 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700469
470 // If the key has been locked by a transaction, return a WRITE_LOCK error.
471 if (preparedKeys.contains(key)) {
472 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700473 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700474
475 // If no transactions are active, remove the key. Otherwise, replace it with a tombstone.
476 if (activeTransactions.isEmpty()) {
477 entries().remove(key);
478 } else {
479 entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, index, null));
480 }
481
Jordan Halterman71635ae2017-07-28 10:35:43 -0700482 Versioned<byte[]> result = toVersioned(value);
Jordan Halterman70df7672017-08-03 16:25:19 -0700483 publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700484 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
485 }
486
487 /**
488 * Handles a remove commit.
489 *
490 * @param commit remove commit
491 * @return map entry update result
492 */
493 protected MapEntryUpdateResult<String, byte[]> remove(Commit<? extends Remove> commit) {
494 return removeIf(commit.index(), commit.value().key(), v -> true);
495 }
496
497 /**
498 * Handles a removeValue commit.
499 *
500 * @param commit removeValue commit
501 * @return map entry update result
502 */
503 protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
504 return removeIf(commit.index(), commit.value().key(), v ->
505 valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
506 }
507
508 /**
509 * Handles a removeVersion commit.
510 *
511 * @param commit removeVersion commit
512 * @return map entry update result
513 */
514 protected MapEntryUpdateResult<String, byte[]> removeVersion(Commit<? extends RemoveVersion> commit) {
515 return removeIf(commit.index(), commit.value().key(), v -> v.version() == commit.value().version());
516 }
517
518 /**
519 * Handles a replace commit.
520 *
521 * @param index the commit index
522 * @param key the key to replace
523 * @param newValue the value with which to replace the key
524 * @param predicate a predicate to determine whether to replace the key
525 * @return map entry update result
526 */
527 private MapEntryUpdateResult<String, byte[]> replaceIf(
528 long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
529 MapEntryValue oldValue = entries().get(key);
Jordan Halterman70df7672017-08-03 16:25:19 -0700530
531 // If the key is not set or the current value doesn't match the predicate, return a PRECONDITION_FAILED error.
532 if (valueIsNull(oldValue) || !predicate.test(oldValue)) {
533 return new MapEntryUpdateResult<>(
534 MapEntryUpdateResult.Status.PRECONDITION_FAILED,
535 index,
536 key,
537 toVersioned(oldValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700538 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700539
540 // If the key has been locked by a transaction, return a WRITE_LOCK error.
541 if (preparedKeys.contains(key)) {
542 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700543 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700544
Jordan Halterman71635ae2017-07-28 10:35:43 -0700545 entries().put(key, newValue);
546 Versioned<byte[]> result = toVersioned(oldValue);
Jordan Halterman70df7672017-08-03 16:25:19 -0700547 publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700548 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
549 }
550
551 /**
552 * Handles a replace commit.
553 *
554 * @param commit replace commit
555 * @return map entry update result
556 */
557 protected MapEntryUpdateResult<String, byte[]> replace(Commit<? extends Replace> commit) {
558 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
559 return replaceIf(commit.index(), commit.value().key(), value, v -> true);
560 }
561
562 /**
563 * Handles a replaceValue commit.
564 *
565 * @param commit replaceValue commit
566 * @return map entry update result
567 */
568 protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
569 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
570 return replaceIf(commit.index(), commit.value().key(), value,
571 v -> valuesEqual(v.value(), commit.value().oldValue()));
572 }
573
574 /**
575 * Handles a replaceVersion commit.
576 *
577 * @param commit replaceVersion commit
578 * @return map entry update result
579 */
580 protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
581 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
582 return replaceIf(commit.index(), commit.value().key(), value,
583 v -> v.version() == commit.value().oldVersion());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700584 }
585
586 /**
587 * Handles a clear commit.
588 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700589 * @return clear result
590 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700591 protected MapEntryUpdateResult.Status clear() {
592 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
593 Map<String, MapEntryValue> entriesToAdd = new HashMap<>();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700594 while (iterator.hasNext()) {
595 Map.Entry<String, MapEntryValue> entry = iterator.next();
596 String key = entry.getKey();
597 MapEntryValue value = entry.getValue();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700598 if (!valueIsNull(value)) {
599 Versioned<byte[]> removedValue = new Versioned<>(value.value(), value.version());
600 publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, removedValue));
601 if (activeTransactions.isEmpty()) {
602 iterator.remove();
603 } else {
604 entriesToAdd.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
605 }
606 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700607 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700608 entries().putAll(entriesToAdd);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700609 return MapEntryUpdateResult.Status.OK;
610 }
611
612 /**
613 * Handles a listen commit.
614 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700615 * @param session listen session
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700616 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700617 protected void listen(RaftSession session) {
618 listeners.put(session.sessionId().id(), session);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700619 }
620
621 /**
622 * Handles an unlisten commit.
623 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700624 * @param session unlisten session
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700625 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700626 protected void unlisten(RaftSession session) {
627 listeners.remove(session.sessionId().id());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700628 }
629
630 /**
631 * Handles a begin commit.
632 *
633 * @param commit transaction begin commit
634 * @return transaction state version
635 */
636 protected long begin(Commit<? extends TransactionBegin> commit) {
637 long version = commit.index();
638 activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
639 return version;
640 }
641
642 /**
643 * Handles an prepare and commit commit.
644 *
645 * @param commit transaction prepare and commit commit
646 * @return prepare result
647 */
648 protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
649 TransactionId transactionId = commit.value().transactionLog().transactionId();
650 PrepareResult prepareResult = prepare(commit);
651 TransactionScope transactionScope = activeTransactions.remove(transactionId);
652 if (prepareResult == PrepareResult.OK) {
653 this.currentVersion = commit.index();
654 transactionScope = transactionScope.prepared(commit);
655 commitTransaction(transactionScope);
656 }
657 discardTombstones();
658 return prepareResult;
659 }
660
661 /**
662 * Handles an prepare commit.
663 *
664 * @param commit transaction prepare commit
665 * @return prepare result
666 */
667 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
668 try {
669 TransactionLog<MapUpdate<String, byte[]>> transactionLog = commit.value().transactionLog();
670
671 // Iterate through records in the transaction log and perform isolation checks.
672 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
673 String key = record.key();
674
675 // If the record is a VERSION_MATCH then check that the record's version matches the current
676 // version of the state machine.
677 if (record.type() == MapUpdate.Type.VERSION_MATCH && key == null) {
678 if (record.version() > currentVersion) {
679 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
680 } else {
681 continue;
682 }
683 }
684
685 // If the prepared keys already contains the key contained within the record, that indicates a
686 // conflict with a concurrent transaction.
687 if (preparedKeys.contains(key)) {
688 return PrepareResult.CONCURRENT_TRANSACTION;
689 }
690
691 // Read the existing value from the map.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700692 MapEntryValue existingValue = entries().get(key);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700693
694 // Note: if the existing value is null, that means the key has not changed during the transaction,
695 // otherwise a tombstone would have been retained.
696 if (existingValue == null) {
697 // If the value is null, ensure the version is equal to the transaction version.
698 if (record.version() != transactionLog.version()) {
699 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
700 }
701 } else {
702 // If the value is non-null, compare the current version with the record version.
703 if (existingValue.version() > record.version()) {
704 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
705 }
706 }
707 }
708
709 // No violations detected. Mark modified keys locked for transactions.
710 transactionLog.records().forEach(record -> {
711 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
712 preparedKeys.add(record.key());
713 }
714 });
715
716 // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
717 // coordinator is communicating with another node. Transactions assume that the client is communicating
718 // with a single leader in order to limit the overhead of retaining tombstones.
719 TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
720 if (transactionScope == null) {
721 activeTransactions.put(
722 transactionLog.transactionId(),
723 new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
724 return PrepareResult.PARTIAL_FAILURE;
725 } else {
726 activeTransactions.put(
727 transactionLog.transactionId(),
728 transactionScope.prepared(commit));
729 return PrepareResult.OK;
730 }
731 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700732 logger().warn("Failure applying {}", commit, e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800733 throw new IllegalStateException(e);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700734 }
735 }
736
737 /**
738 * Handles an commit commit (ha!).
739 *
740 * @param commit transaction commit commit
741 * @return commit result
742 */
743 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
744 TransactionId transactionId = commit.value().transactionId();
745 TransactionScope transactionScope = activeTransactions.remove(transactionId);
746 if (transactionScope == null) {
747 return CommitResult.UNKNOWN_TRANSACTION_ID;
748 }
749
750 try {
751 this.currentVersion = commit.index();
752 return commitTransaction(transactionScope);
753 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700754 logger().warn("Failure applying {}", commit, e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800755 throw new IllegalStateException(e);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700756 } finally {
757 discardTombstones();
758 }
759 }
760
761 /**
762 * Applies committed operations to the state machine.
763 */
764 private CommitResult commitTransaction(TransactionScope transactionScope) {
765 TransactionLog<MapUpdate<String, byte[]>> transactionLog = transactionScope.transactionLog();
766 boolean retainTombstones = !activeTransactions.isEmpty();
767
768 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
769 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
770 if (record.type() == MapUpdate.Type.VERSION_MATCH) {
771 continue;
772 }
773
774 String key = record.key();
775 checkState(preparedKeys.remove(key), "key is not prepared");
776
777 if (record.type() == MapUpdate.Type.LOCK) {
778 continue;
779 }
780
Jordan Halterman71635ae2017-07-28 10:35:43 -0700781 MapEntryValue previousValue = entries().remove(key);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700782 MapEntryValue newValue = null;
783
784 // If the record is not a delete, create a transactional commit.
785 if (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
786 newValue = new MapEntryValue(MapEntryValue.Type.VALUE, currentVersion, record.value());
787 } else if (retainTombstones) {
788 // For deletes, if tombstones need to be retained then create and store a tombstone commit.
789 newValue = new MapEntryValue(MapEntryValue.Type.TOMBSTONE, currentVersion, null);
790 }
791
Jordan Halterman71635ae2017-07-28 10:35:43 -0700792 MapEvent<String, byte[]> event;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700793 if (newValue != null) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700794 entries().put(key, newValue);
795 if (!valueIsNull(newValue)) {
796 if (!valueIsNull(previousValue)) {
797 event = new MapEvent<>(
798 MapEvent.Type.UPDATE,
799 "",
800 key,
801 toVersioned(newValue),
802 toVersioned(previousValue));
803 } else {
804 event = new MapEvent<>(
805 MapEvent.Type.INSERT,
806 "",
807 key,
808 toVersioned(newValue),
809 null);
810 }
811 } else {
812 event = new MapEvent<>(
813 MapEvent.Type.REMOVE,
814 "",
815 key,
816 null,
817 toVersioned(previousValue));
818 }
819 } else {
820 event = new MapEvent<>(
821 MapEvent.Type.REMOVE,
822 "",
823 key,
824 null,
825 toVersioned(previousValue));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700826 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700827 eventsToPublish.add(event);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700828 }
829 publish(eventsToPublish);
830 return CommitResult.OK;
831 }
832
833 /**
834 * Handles an rollback commit (ha!).
835 *
836 * @param commit transaction rollback commit
837 * @return rollback result
838 */
839 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
840 TransactionId transactionId = commit.value().transactionId();
841 TransactionScope transactionScope = activeTransactions.remove(transactionId);
842 if (transactionScope == null) {
843 return RollbackResult.UNKNOWN_TRANSACTION_ID;
844 } else if (!transactionScope.isPrepared()) {
845 discardTombstones();
846 return RollbackResult.OK;
847 } else {
848 try {
849 transactionScope.transactionLog().records()
850 .forEach(record -> {
851 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
852 preparedKeys.remove(record.key());
853 }
854 });
855 return RollbackResult.OK;
856 } finally {
857 discardTombstones();
858 }
859 }
860
861 }
862
863 /**
864 * Discards tombstones no longer needed by active transactions.
865 */
866 private void discardTombstones() {
867 if (activeTransactions.isEmpty()) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700868 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700869 while (iterator.hasNext()) {
870 MapEntryValue value = iterator.next().getValue();
871 if (value.type() == MapEntryValue.Type.TOMBSTONE) {
872 iterator.remove();
873 }
874 }
875 } else {
876 long lowWaterMark = activeTransactions.values().stream()
877 .mapToLong(TransactionScope::version)
878 .min().getAsLong();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700879 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700880 while (iterator.hasNext()) {
881 MapEntryValue value = iterator.next().getValue();
882 if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
883 iterator.remove();
884 }
885 }
886 }
887 }
888
889 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700890 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
891 * @param value map entry value
892 * @return versioned instance
893 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700894 protected Versioned<byte[]> toVersioned(MapEntryValue value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700895 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
896 ? new Versioned<>(value.value(), value.version()) : null;
897 }
898
899 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700900 * Publishes an event to listeners.
901 *
902 * @param event event to publish
903 */
904 private void publish(MapEvent<String, byte[]> event) {
905 publish(Lists.newArrayList(event));
906 }
907
908 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700909 * Publishes events to listeners.
910 *
911 * @param events list of map event to publish
912 */
913 private void publish(List<MapEvent<String, byte[]>> events) {
914 listeners.values().forEach(session -> {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700915 session.publish(CHANGE, serializer()::encode, events);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700916 });
917 }
918
919 @Override
920 public void onExpire(RaftSession session) {
921 closeListener(session.sessionId().id());
922 }
923
924 @Override
925 public void onClose(RaftSession session) {
926 closeListener(session.sessionId().id());
927 }
928
929 private void closeListener(Long sessionId) {
930 listeners.remove(sessionId);
931 }
932
933 /**
934 * Interface implemented by map values.
935 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700936 protected static class MapEntryValue {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700937 protected final Type type;
938 protected final long version;
939 protected final byte[] value;
940
941 MapEntryValue(Type type, long version, byte[] value) {
942 this.type = type;
943 this.version = version;
944 this.value = value;
945 }
946
947 /**
948 * Returns the value type.
949 *
950 * @return the value type
951 */
952 Type type() {
953 return type;
954 }
955
956 /**
957 * Returns the version of the value.
958 *
959 * @return version
960 */
961 long version() {
962 return version;
963 }
964
965 /**
966 * Returns the raw {@code byte[]}.
967 *
968 * @return raw value
969 */
970 byte[] value() {
971 return value;
972 }
973
974 /**
975 * Value type.
976 */
977 enum Type {
978 VALUE,
979 TOMBSTONE,
980 }
981 }
982
983 /**
984 * Map transaction scope.
985 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700986 protected static final class TransactionScope {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700987 private final long version;
988 private final TransactionLog<MapUpdate<String, byte[]>> transactionLog;
989
990 private TransactionScope(long version) {
991 this(version, null);
992 }
993
994 private TransactionScope(long version, TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
995 this.version = version;
996 this.transactionLog = transactionLog;
997 }
998
999 /**
1000 * Returns the transaction version.
1001 *
1002 * @return the transaction version
1003 */
1004 long version() {
1005 return version;
1006 }
1007
1008 /**
1009 * Returns whether this is a prepared transaction scope.
1010 *
1011 * @return whether this is a prepared transaction scope
1012 */
1013 boolean isPrepared() {
1014 return transactionLog != null;
1015 }
1016
1017 /**
1018 * Returns the transaction commit log.
1019 *
1020 * @return the transaction commit log
1021 */
1022 TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
1023 checkState(isPrepared());
1024 return transactionLog;
1025 }
1026
1027 /**
1028 * Returns a new transaction scope with a prepare commit.
1029 *
1030 * @param commit the prepare commit
1031 * @return new transaction scope updated with the prepare commit
1032 */
1033 TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
1034 return new TransactionScope(version, commit.value().transactionLog());
1035 }
1036 }
1037}