blob: 2d18c357c92aa5e2408fb605e0a9c93277ecb0d0 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
Jordan Halterman9fc40ed2018-06-21 00:00:15 -070019import java.util.ArrayList;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070020import java.util.Arrays;
21import java.util.Collection;
22import java.util.Comparator;
23import java.util.EnumSet;
24import java.util.HashMap;
Jordan Halterman9fc40ed2018-06-21 00:00:15 -070025import java.util.Iterator;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070026import java.util.LinkedHashMap;
27import java.util.List;
28import java.util.Map;
29import java.util.Set;
30import java.util.TreeSet;
31import java.util.concurrent.atomic.AtomicLong;
32import java.util.function.BiConsumer;
33import java.util.function.BinaryOperator;
34import java.util.function.Function;
35import java.util.function.Supplier;
36import java.util.stream.Collector;
37import java.util.stream.Collectors;
38
39import com.esotericsoftware.kryo.Kryo;
40import com.esotericsoftware.kryo.io.Input;
41import com.esotericsoftware.kryo.io.Output;
42import com.google.common.base.Preconditions;
43import com.google.common.collect.HashMultiset;
44import com.google.common.collect.ImmutableSet;
45import com.google.common.collect.Lists;
46import com.google.common.collect.Maps;
47import com.google.common.collect.Multiset;
48import com.google.common.collect.Sets;
49import io.atomix.protocols.raft.service.AbstractRaftService;
50import io.atomix.protocols.raft.service.Commit;
51import io.atomix.protocols.raft.service.RaftServiceExecutor;
pierfa48c6e2019-10-11 18:19:59 +020052import io.atomix.protocols.raft.service.impl.DefaultCommit;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070053import io.atomix.protocols.raft.session.RaftSession;
54import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
55import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
56import org.onlab.util.KryoNamespace;
57import org.onlab.util.Match;
58import org.onosproject.store.serializers.KryoNamespaces;
59import org.onosproject.store.service.MultimapEvent;
60import org.onosproject.store.service.Serializer;
61import org.onosproject.store.service.Versioned;
62
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
Jordan Halterman9fc40ed2018-06-21 00:00:15 -070066import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070067import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsEntry;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsKey;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsValue;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ENTRIES;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
Jordan Halterman9fc40ed2018-06-21 00:00:15 -070077import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
78import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070079import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
80import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
81import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
82import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
Jordan Halterman9fc40ed2018-06-21 00:00:15 -070083import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
84import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070085import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
Jordan Halterman99c654d2018-06-04 14:53:06 -070086import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070087import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
88import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
89import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
Jordan Halterman99c654d2018-06-04 14:53:06 -070090import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070091import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
92import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
93import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
94import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Replace;
95import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.SIZE;
96import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.VALUES;
pierfa48c6e2019-10-11 18:19:59 +020097import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MULTI_PUT_ALL;
98import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiPutAll;
99import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MULTI_REMOVE_ALL;
100import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemoveAll;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700101
102/**
103 * State Machine for {@link AtomixConsistentSetMultimap} resource.
104 */
105public class AtomixConsistentSetMultimapService extends AbstractRaftService {
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700106 private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700107
108 private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
109 .register(KryoNamespaces.BASIC)
110 .register(AtomixConsistentSetMultimapOperations.NAMESPACE)
111 .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
112 .register(ByteArrayComparator.class)
113 .register(new HashMap().keySet().getClass())
114 .register(TreeSet.class)
115 .register(new com.esotericsoftware.kryo.Serializer<NonTransactionalCommit>() {
116 @Override
117 public void write(Kryo kryo, Output output, NonTransactionalCommit object) {
118 kryo.writeClassAndObject(output, object.valueSet);
119 }
120
121 @Override
122 @SuppressWarnings("unchecked")
123 public NonTransactionalCommit read(Kryo kryo, Input input, Class<NonTransactionalCommit> type) {
124 NonTransactionalCommit commit = new NonTransactionalCommit();
125 commit.valueSet.addAll((Collection<byte[]>) kryo.readClassAndObject(input));
126 return commit;
127 }
128 }, NonTransactionalCommit.class)
129 .build());
130
131 private AtomicLong globalVersion = new AtomicLong(1);
132 private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700133 private Map<String, MapEntryValue> backingMap = Maps.newConcurrentMap();
134 private Map<Long, IteratorContext> iterators = Maps.newHashMap();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700135
136 @Override
137 public void snapshot(SnapshotWriter writer) {
138 writer.writeLong(globalVersion.get());
139 writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer::encode);
140 writer.writeObject(backingMap, serializer::encode);
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700141
142 Map<Long, Long> iterators = Maps.newHashMap();
143 this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
144 writer.writeObject(iterators, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700145 }
146
147 @Override
148 public void install(SnapshotReader reader) {
149 globalVersion = new AtomicLong(reader.readLong());
150
151 listeners = new LinkedHashMap<>();
152 for (Long sessionId : reader.<Set<Long>>readObject(serializer::decode)) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700153 listeners.put(sessionId, sessions().getSession(sessionId));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700154 }
155
156 backingMap = reader.readObject(serializer::decode);
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700157
158 Map<Long, Long> iterators = reader.readObject(serializer::decode);
159 this.iterators = Maps.newHashMap();
160 iterators.forEach((id, session) ->
161 this.iterators.put(id, new IteratorContext(session, backingMap.entrySet().iterator())));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700162 }
163
164 @Override
165 protected void configure(RaftServiceExecutor executor) {
166 executor.register(SIZE, this::size, serializer::encode);
167 executor.register(IS_EMPTY, this::isEmpty, serializer::encode);
168 executor.register(CONTAINS_KEY, serializer::decode, this::containsKey, serializer::encode);
169 executor.register(CONTAINS_VALUE, serializer::decode, this::containsValue, serializer::encode);
170 executor.register(CONTAINS_ENTRY, serializer::decode, this::containsEntry, serializer::encode);
171 executor.register(CLEAR, this::clear);
172 executor.register(KEY_SET, this::keySet, serializer::encode);
173 executor.register(KEYS, this::keys, serializer::encode);
174 executor.register(VALUES, this::values, serializer::encode);
175 executor.register(ENTRIES, this::entries, serializer::encode);
176 executor.register(GET, serializer::decode, this::get, serializer::encode);
pierfa48c6e2019-10-11 18:19:59 +0200177 executor.register(MULTI_REMOVE_ALL, serializer::decode, this::multiRemoveAll, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700178 executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
179 executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
Jordan Halterman99c654d2018-06-04 14:53:06 -0700180 executor.register(REMOVE_AND_GET, serializer::decode, this::removeAndGet, serializer::encode);
pierfa48c6e2019-10-11 18:19:59 +0200181 executor.register(MULTI_PUT_ALL, serializer::decode, this::multiPutAll, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700182 executor.register(PUT, serializer::decode, this::put, serializer::encode);
Jordan Halterman99c654d2018-06-04 14:53:06 -0700183 executor.register(PUT_AND_GET, serializer::decode, this::putAndGet, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700184 executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
185 executor.register(ADD_LISTENER, this::listen);
186 executor.register(REMOVE_LISTENER, this::unlisten);
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700187 executor.register(OPEN_ITERATOR, this::openIterator, serializer::encode);
188 executor.register(NEXT, serializer::decode, this::next, serializer::encode);
189 executor.register(CLOSE_ITERATOR, serializer::decode, this::closeIterator);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700190 }
191
192 @Override
193 public void onExpire(RaftSession session) {
194 listeners.remove(session.sessionId().id());
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700195 iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700196 }
197
198 @Override
199 public void onClose(RaftSession session) {
200 listeners.remove(session.sessionId().id());
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700201 iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700202 }
203
204 /**
205 * Handles a Size commit.
206 *
207 * @param commit Size commit
208 * @return number of unique key value pairs in the multimap
209 */
210 protected int size(Commit<Void> commit) {
211 return backingMap.values()
212 .stream()
213 .map(valueCollection -> valueCollection.values().size())
214 .collect(Collectors.summingInt(size -> size));
215 }
216
217 /**
218 * Handles an IsEmpty commit.
219 *
220 * @param commit IsEmpty commit
221 * @return true if the multimap contains no key-value pairs, else false
222 */
223 protected boolean isEmpty(Commit<Void> commit) {
224 return backingMap.isEmpty();
225 }
226
227 /**
228 * Handles a contains key commit.
229 *
230 * @param commit ContainsKey commit
231 * @return returns true if the key is in the multimap, else false
232 */
233 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
234 return backingMap.containsKey(commit.value().key());
235 }
236
237 /**
238 * Handles a ContainsValue commit.
239 *
240 * @param commit ContainsValue commit
241 * @return true if the value is in the multimap, else false
242 */
243 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
244 if (backingMap.values().isEmpty()) {
245 return false;
246 }
247 Match<byte[]> match = Match.ifValue(commit.value().value());
248 return backingMap
249 .values()
250 .stream()
251 .anyMatch(valueList ->
252 valueList
253 .values()
254 .stream()
255 .anyMatch(byteValue ->
256 match.matches(byteValue)));
257 }
258
259 /**
260 * Handles a ContainsEntry commit.
261 *
262 * @param commit ContainsEntry commit
263 * @return true if the key-value pair exists, else false
264 */
265 protected boolean containsEntry(Commit<? extends ContainsEntry> commit) {
266 MapEntryValue entryValue =
267 backingMap.get(commit.value().key());
268 if (entryValue == null) {
269 return false;
270 } else {
271 Match valueMatch = Match.ifValue(commit.value().value());
272 return entryValue
273 .values()
274 .stream()
275 .anyMatch(byteValue -> valueMatch.matches(byteValue));
276 }
277 }
278
279 /**
280 * Handles a Clear commit.
281 *
282 * @param commit Clear commit
283 */
284 protected void clear(Commit<Void> commit) {
285 backingMap.clear();
286 }
287
288 /**
289 * Handles a KeySet commit.
290 *
291 * @param commit KeySet commit
292 * @return a set of all keys in the multimap
293 */
294 protected Set<String> keySet(Commit<Void> commit) {
295 return ImmutableSet.copyOf(backingMap.keySet());
296 }
297
298 /**
299 * Handles a Keys commit.
300 *
301 * @param commit Keys commit
302 * @return a multiset of keys with each key included an equal number of
303 * times to the total key-value pairs in which that key participates
304 */
305 protected Multiset<String> keys(Commit<Void> commit) {
306 Multiset keys = HashMultiset.create();
307 backingMap.forEach((key, mapEntryValue) -> {
308 keys.add(key, mapEntryValue.values().size());
309 });
310 return keys;
311 }
312
313 /**
314 * Handles a Values commit.
315 *
316 * @param commit Values commit
317 * @return the set of values in the multimap with duplicates included
318 */
319 protected Multiset<byte[]> values(Commit<Void> commit) {
320 return backingMap
321 .values()
322 .stream()
323 .collect(new HashMultisetValueCollector());
324 }
325
326 /**
327 * Handles an Entries commit.
328 *
329 * @param commit Entries commit
330 * @return a set of all key-value pairs in the multimap
331 */
332 protected Collection<Map.Entry<String, byte[]>> entries(Commit<Void> commit) {
333 return backingMap
334 .entrySet()
335 .stream()
336 .collect(new EntrySetCollector());
337 }
338
339 /**
340 * Handles a Get commit.
341 *
342 * @param commit Get commit
343 * @return the collection of values associated with the key or an empty
344 * list if none exist
345 */
346 protected Versioned<Collection<? extends byte[]>> get(Commit<? extends Get> commit) {
347 return toVersioned(backingMap.get(commit.value().key()));
348 }
349
350 /**
351 * Handles a removeAll commit, and returns the previous mapping.
352 *
353 * @param commit removeAll commit
354 * @return collection of removed values
355 */
356 protected Versioned<Collection<? extends byte[]>> removeAll(Commit<? extends RemoveAll> commit) {
357 String key = commit.value().key();
358
359 if (!backingMap.containsKey(key)) {
360 return new Versioned<>(Sets.newHashSet(), -1);
361 }
362
363 Versioned<Collection<? extends byte[]>> removedValues =
364 backingMap.get(key).addCommit(commit);
365 publish(removedValues.value().stream()
366 .map(value -> new MultimapEvent<String, byte[]>(
367 "", key, null, value))
368 .collect(Collectors.toList()));
369 return removedValues;
370 }
371
372 /**
373 * Handles a multiRemove commit, returns true if the remove results in any
374 * change.
375 * @param commit multiRemove commit
376 * @return true if any change results, else false
377 */
378 protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
379 String key = commit.value().key();
380
381 if (!backingMap.containsKey(key)) {
382 return false;
383 }
384
385 Versioned<Collection<? extends byte[]>> removedValues = backingMap
Jordan Halterman99c654d2018-06-04 14:53:06 -0700386 .get(key)
387 .addCommit(commit);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700388
389 if (removedValues != null) {
390 if (removedValues.value().isEmpty()) {
391 backingMap.remove(key);
392 }
393
394 publish(removedValues.value().stream()
Jordan Halterman99c654d2018-06-04 14:53:06 -0700395 .map(value -> new MultimapEvent<String, byte[]>(
396 "", key, null, value))
397 .collect(Collectors.toList()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700398 return true;
399 }
400
401 return false;
402 }
403
404 /**
pierfa48c6e2019-10-11 18:19:59 +0200405 * Handles a MultiRemoveAll commit, returns true if any change results from this
406 * commit.
407 * @param commit a MultiRemoveAll commit
408 * @return true if this commit results in a change, else false
409 */
410 protected boolean multiRemoveAll(Commit<? extends MultiRemoveAll> commit) {
411 Map<String, Collection<? extends byte[]>> mapping = commit.value().mapping();
412 // There are no updates
413 if (mapping.isEmpty()) {
414 return false;
415 }
416 // Decompose the commit in several updates
417 boolean operationResult = false;
418 for (Map.Entry<String, Collection<? extends byte[]>> entry : mapping.entrySet()) {
419 MultiRemove update = new MultiRemove(entry.getKey(), entry.getValue(), commit.value().versionMatch());
420 Commit<? extends MultiRemove> commitUpdate = new DefaultCommit<>(commit.index(), commit.operation(),
421 update, commit.session(),
422 commit.wallClockTime().unixTimestamp());
423 if (multiRemove(commitUpdate)) {
424 operationResult = true;
425 }
426 }
427 return operationResult;
428 }
429
430 /**
Jordan Halterman99c654d2018-06-04 14:53:06 -0700431 * Handles a removeAndGet commit.
432 *
433 * @param commit multiRemove commit
434 * @return the updated values or null if the values are empty
435 */
436 protected Versioned<Collection<? extends byte[]>> removeAndGet(Commit<? extends MultiRemove> commit) {
437 String key = commit.value().key();
438
439 if (!backingMap.containsKey(key)) {
440 return null;
441 }
442
443 Versioned<Collection<? extends byte[]>> removedValues = backingMap
444 .get(key)
445 .addCommit(commit);
446
447 if (removedValues != null) {
448 if (removedValues.value().isEmpty()) {
449 backingMap.remove(key);
450 }
451
452 publish(removedValues.value().stream()
453 .map(value -> new MultimapEvent<String, byte[]>(
454 "", key, null, value))
455 .collect(Collectors.toList()));
456 }
457
458 return toVersioned(backingMap.get(key));
459 }
460
461 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700462 * Handles a put commit, returns true if any change results from this
463 * commit.
464 * @param commit a put commit
465 * @return true if this commit results in a change, else false
466 */
467 protected boolean put(Commit<? extends Put> commit) {
468 String key = commit.value().key();
469 if (commit.value().values().isEmpty()) {
470 return false;
471 }
472 if (!backingMap.containsKey(key)) {
473 backingMap.put(key, new NonTransactionalCommit());
474 }
475
476 Versioned<Collection<? extends byte[]>> addedValues = backingMap
Jordan Halterman99c654d2018-06-04 14:53:06 -0700477 .get(key)
478 .addCommit(commit);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700479
480 if (addedValues != null) {
481 publish(addedValues.value().stream()
Jordan Halterman99c654d2018-06-04 14:53:06 -0700482 .map(value -> new MultimapEvent<String, byte[]>(
483 "", key, value, null))
484 .collect(Collectors.toList()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700485 return true;
486 }
487
488 return false;
489 }
490
Jordan Halterman99c654d2018-06-04 14:53:06 -0700491 /**
pierfa48c6e2019-10-11 18:19:59 +0200492 * Handles a MultiPutAll commit, returns true if any change results from this
493 * commit.
494 * @param commit a MultiPutAll commit
495 * @return true if this commit results in a change, else false
496 */
497 protected boolean multiPutAll(Commit<? extends MultiPutAll> commit) {
498 Map<String, Collection<? extends byte[]>> mapping = commit.value().mapping();
499 // There are no updates
500 if (mapping.isEmpty()) {
501 return false;
502 }
503 // Decompose the commit in several updates
504 boolean operationResult = false;
505 for (Map.Entry<String, Collection<? extends byte[]>> entry : mapping.entrySet()) {
506 Put update = new Put(entry.getKey(), entry.getValue(), commit.value().versionMatch());
507 Commit<? extends Put> commitUpdate = new DefaultCommit<>(commit.index(), commit.operation(),
508 update, commit.session(),
509 commit.wallClockTime().unixTimestamp());
510 if (put(commitUpdate)) {
511 operationResult = true;
512 }
513 }
514 return operationResult;
515 }
516
517 /**
Jordan Halterman99c654d2018-06-04 14:53:06 -0700518 * Handles a putAndGet commit.
519 *
520 * @param commit a put commit
521 * @return the updated values
522 */
523 protected Versioned<Collection<? extends byte[]>> putAndGet(Commit<? extends Put> commit) {
524 String key = commit.value().key();
525 if (commit.value().values().isEmpty()) {
526 return null;
527 }
528 if (!backingMap.containsKey(key)) {
529 backingMap.put(key, new NonTransactionalCommit());
530 }
531
532 Versioned<Collection<? extends byte[]>> addedValues = backingMap
533 .get(key)
534 .addCommit(commit);
535
536 if (addedValues != null) {
537 publish(addedValues.value().stream()
538 .map(value -> new MultimapEvent<String, byte[]>(
539 "", key, value, null))
540 .collect(Collectors.toList()));
541 }
542
543 return toVersioned(backingMap.get(key));
544 }
545
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700546 protected Versioned<Collection<? extends byte[]>> replace(
547 Commit<? extends Replace> commit) {
Jordan Halterman7edca042018-07-11 09:49:15 -0700548 String key = commit.value().key();
549 if (!backingMap.containsKey(key)) {
550 backingMap.put(key, new NonTransactionalCommit());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700551 }
Jordan Halterman7edca042018-07-11 09:49:15 -0700552
553 Versioned<Collection<? extends byte[]>> values = backingMap.get(commit.value().key()).addCommit(commit);
554 if (values != null) {
555 Set<byte[]> addedValues = Sets.newTreeSet(new ByteArrayComparator());
556 addedValues.addAll(commit.value().values());
557
558 Set<byte[]> removedValues = Sets.newTreeSet(new ByteArrayComparator());
559 removedValues.addAll(values.value());
560
561 List<MultimapEvent<String, byte[]>> events = Lists.newArrayList();
562 Sets.difference(removedValues, addedValues)
563 .forEach(value -> events.add(new MultimapEvent<>("", key, null, value)));
564 Sets.difference(addedValues, removedValues)
565 .forEach(value -> events.add(new MultimapEvent<>("", key, value, null)));
566
567 publish(events);
568 }
569 return values;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700570 }
571
572 /**
573 * Handles a listen commit.
574 *
575 * @param commit listen commit
576 */
577 protected void listen(Commit<Void> commit) {
578 listeners.put(commit.session().sessionId().id(), commit.session());
579 }
580
581 /**
582 * Handles an unlisten commit.
583 *
584 * @param commit unlisten commit
585 */
586 protected void unlisten(Commit<Void> commit) {
587 listeners.remove(commit.session().sessionId().id());
588 }
589
590 /**
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700591 * Handles an open iterator commit.
Jordan Halterman21ef9e42018-05-21 22:11:07 -0700592 *
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700593 * @param commit the open iterator commit
594 * @return iterator identifier
Jordan Halterman21ef9e42018-05-21 22:11:07 -0700595 */
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700596 protected long openIterator(Commit<Void> commit) {
597 iterators.put(commit.index(), new IteratorContext(
598 commit.session().sessionId().id(),
599 backingMap.entrySet().iterator()));
600 return commit.index();
601 }
602
603 /**
604 * Handles an iterator next commit.
605 *
606 * @param commit the next commit
607 * @return a list of entries to iterate
608 */
609 protected IteratorBatch next(Commit<IteratorPosition> commit) {
610 final long iteratorId = commit.value().iteratorId();
611 final int position = commit.value().position();
612
613 IteratorContext context = iterators.get(iteratorId);
614 if (context == null) {
615 return null;
616 }
617
618 List<Map.Entry<String, byte[]>> entries = new ArrayList<>();
619 int size = 0;
620 while (context.iterator.hasNext()) {
621 context.position++;
622 if (context.position > position) {
623 Map.Entry<String, MapEntryValue> entry = context.iterator.next();
624 String key = entry.getKey();
625 int keySize = key.length();
626 for (byte[] value : entry.getValue().values()) {
627 entries.add(Maps.immutableEntry(key, value));
628 size += keySize;
629 size += value.length;
630 }
631
632 if (size >= MAX_ITERATOR_BATCH_SIZE) {
633 break;
634 }
Jordan Halterman21ef9e42018-05-21 22:11:07 -0700635 }
636 }
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700637
638 if (entries.isEmpty()) {
639 return null;
640 }
641 return new IteratorBatch(context.position, entries);
642 }
643
644 /**
645 * Handles a close iterator commit.
646 *
647 * @param commit the close iterator commit
648 */
649 protected void closeIterator(Commit<Long> commit) {
650 iterators.remove(commit.value());
Jordan Halterman21ef9e42018-05-21 22:11:07 -0700651 }
652
653 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700654 * Publishes events to listeners.
655 *
656 * @param events list of map event to publish
657 */
658 private void publish(List<MultimapEvent<String, byte[]>> events) {
659 listeners.values().forEach(session -> session.publish(CHANGE, serializer::encode, events));
660 }
661
662 private interface MapEntryValue {
663
664 /**
665 * Returns the list of raw {@code byte[]'s}.
666 *
667 * @return list of raw values
668 */
669 Collection<? extends byte[]> values();
670
671 /**
672 * Returns the version of the value.
673 *
674 * @return version
675 */
676 long version();
677
678 /**
679 * Add a new commit and modifies the set of values accordingly.
680 * In the case of a replace or removeAll it returns the set of removed
681 * values. In the case of put or multiRemove it returns null for no
682 * change and a set of the added or removed values respectively if a
683 * change resulted.
684 *
685 * @param commit the commit to be added
686 */
687 Versioned<Collection<? extends byte[]>> addCommit(
688 Commit<? extends MultimapOperation> commit);
689 }
690
691 private class NonTransactionalCommit implements MapEntryValue {
692 private long version;
693 private final TreeSet<byte[]> valueSet = Sets.newTreeSet(new ByteArrayComparator());
694
695 public NonTransactionalCommit() {
696 //Set the version to current it will only be updated once this is
697 // populated
698 this.version = globalVersion.get();
699 }
700
701 @Override
702 public Collection<? extends byte[]> values() {
703 return ImmutableSet.copyOf(valueSet);
704 }
705
706 @Override
707 public long version() {
708 return version;
709 }
710
711 @Override
712 public Versioned<Collection<? extends byte[]>> addCommit(
713 Commit<? extends MultimapOperation> commit) {
714 Preconditions.checkNotNull(commit);
715 Preconditions.checkNotNull(commit.value());
716 Versioned<Collection<? extends byte[]>> retVersion;
717
718 if (commit.value() instanceof Put) {
719 //Using a treeset here sanitizes the input, removing duplicates
720 Set<byte[]> valuesToAdd =
721 Sets.newTreeSet(new ByteArrayComparator());
722 ((Put) commit.value()).values().forEach(value -> {
723 if (!valueSet.contains(value)) {
724 valuesToAdd.add(value);
725 }
726 });
727 if (valuesToAdd.isEmpty()) {
728 //Do not increment or add the commit if no change resulted
729 return null;
730 }
731 retVersion = new Versioned<>(valuesToAdd, version);
732 valuesToAdd.forEach(value -> valueSet.add(value));
733 version++;
734 return retVersion;
735
736 } else if (commit.value() instanceof Replace) {
737 //Will this work?? Need to check before check-in!
738 Set<byte[]> removedValues = Sets.newHashSet();
739 removedValues.addAll(valueSet);
740 retVersion = new Versioned<>(removedValues, version);
741 valueSet.clear();
742 Set<byte[]> valuesToAdd =
743 Sets.newTreeSet(new ByteArrayComparator());
744 ((Replace) commit.value()).values().forEach(value -> {
745 valuesToAdd.add(value);
746 });
747 if (valuesToAdd.isEmpty()) {
748 version = globalVersion.incrementAndGet();
749 backingMap.remove(((Replace) commit.value()).key());
750 return retVersion;
751 }
752 valuesToAdd.forEach(value -> valueSet.add(value));
753 version = globalVersion.incrementAndGet();
754 return retVersion;
755
756 } else if (commit.value() instanceof RemoveAll) {
757 Set<byte[]> removed = Sets.newHashSet();
758 //We can assume here that values only appear once and so we
759 //do not need to sanitize the return for duplicates.
760 removed.addAll(valueSet);
761 retVersion = new Versioned<>(removed, version);
762 valueSet.clear();
763 //In the case of a removeAll all commits will be removed and
764 //unlike the multiRemove case we do not need to consider
765 //dependencies among additive and removal commits.
766
767 //Save the key for use after the commit is closed
768 String key = ((RemoveAll) commit.value()).key();
769 version = globalVersion.incrementAndGet();
770 backingMap.remove(key);
771 return retVersion;
772
773 } else if (commit.value() instanceof MultiRemove) {
774 //Must first calculate how many commits the removal depends on.
775 //At this time we also sanitize the removal set by adding to a
776 //set with proper handling of byte[] equality.
777 Set<byte[]> removed = Sets.newHashSet();
778 ((MultiRemove) commit.value()).values().forEach(value -> {
779 if (valueSet.contains(value)) {
780 removed.add(value);
781 }
782 });
783 //If there is nothing to be removed no action should be taken.
784 if (removed.isEmpty()) {
785 return null;
786 }
787 //Save key in case countdown results in closing the commit.
788 String removedKey = ((MultiRemove) commit.value()).key();
789 removed.forEach(removedValue -> {
790 valueSet.remove(removedValue);
791 });
792 //The version is updated locally as well as globally even if
793 //this object will be removed from the map in case any other
794 //party still holds a reference to this object.
795 retVersion = new Versioned<>(removed, version);
796 version = globalVersion.incrementAndGet();
797 if (valueSet.isEmpty()) {
798 backingMap.remove(removedKey);
799 }
800 return retVersion;
801
802 } else {
803 throw new IllegalArgumentException();
804 }
805 }
806 }
807
808 /**
809 * A collector that creates MapEntryValues and creates a multiset of all
810 * values in the map an equal number of times to the number of sets in
811 * which they participate.
812 */
813 private class HashMultisetValueCollector implements
814 Collector<MapEntryValue,
815 HashMultiset<byte[]>,
816 HashMultiset<byte[]>> {
817
818 @Override
819 public Supplier<HashMultiset<byte[]>> supplier() {
820 return HashMultiset::create;
821 }
822
823 @Override
824 public BiConsumer<HashMultiset<byte[]>, MapEntryValue> accumulator() {
825 return (multiset, mapEntryValue) ->
826 multiset.addAll(mapEntryValue.values());
827 }
828
829 @Override
830 public BinaryOperator<HashMultiset<byte[]>> combiner() {
831 return (setOne, setTwo) -> {
832 setOne.addAll(setTwo);
833 return setOne;
834 };
835 }
836
837 @Override
838 public Function<HashMultiset<byte[]>,
839 HashMultiset<byte[]>> finisher() {
840 return Function.identity();
841 }
842
843 @Override
844 public Set<Characteristics> characteristics() {
845 return EnumSet.of(Characteristics.UNORDERED);
846 }
847 }
848
849 /**
850 * A collector that creates Entries of {@code <String, MapEntryValue>} and
851 * creates a set of entries all key value pairs in the map.
852 */
853 private class EntrySetCollector implements
854 Collector<Map.Entry<String, MapEntryValue>,
855 Set<Map.Entry<String, byte[]>>,
856 Set<Map.Entry<String, byte[]>>> {
857 private Set<Map.Entry<String, byte[]>> set = null;
858
859 @Override
860 public Supplier<Set<Map.Entry<String, byte[]>>> supplier() {
861 return () -> {
862 if (set == null) {
863 set = Sets.newHashSet();
864 }
865 return set;
866 };
867 }
868
869 @Override
870 public BiConsumer<Set<Map.Entry<String, byte[]>>,
871 Map.Entry<String, MapEntryValue>> accumulator() {
872 return (set, entry) -> {
873 entry
874 .getValue()
875 .values()
876 .forEach(byteValue ->
877 set.add(Maps.immutableEntry(entry.getKey(),
878 byteValue)));
879 };
880 }
881
882 @Override
883 public BinaryOperator<Set<Map.Entry<String, byte[]>>> combiner() {
884 return (setOne, setTwo) -> {
885 setOne.addAll(setTwo);
886 return setOne;
887 };
888 }
889
890 @Override
891 public Function<Set<Map.Entry<String, byte[]>>,
892 Set<Map.Entry<String, byte[]>>> finisher() {
893 return (unused) -> set;
894 }
895
896 @Override
897 public Set<Characteristics> characteristics() {
898 return EnumSet.of(Characteristics.UNORDERED);
899 }
900 }
901 /**
902 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
903 * @param value map entry value
904 * @return versioned instance or an empty list versioned -1 if argument is
905 * null
906 */
907 private Versioned<Collection<? extends byte[]>> toVersioned(
908 MapEntryValue value) {
909 return value == null ? new Versioned<>(Lists.newArrayList(), -1) :
910 new Versioned<>(value.values(),
911 value.version());
912 }
913
914 private static class ByteArrayComparator implements Comparator<byte[]> {
915
916 @Override
917 public int compare(byte[] o1, byte[] o2) {
918 if (Arrays.equals(o1, o2)) {
919 return 0;
920 } else {
921 for (int i = 0; i < o1.length && i < o2.length; i++) {
922 if (o1[i] < o2[i]) {
923 return -1;
924 } else if (o1[i] > o2[i]) {
925 return 1;
926 }
927 }
928 return o1.length > o2.length ? 1 : -1;
929 }
930 }
931 }
Jordan Halterman9fc40ed2018-06-21 00:00:15 -0700932
933 private static class IteratorContext {
934 private final long sessionId;
935 private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
936 private int position = 0;
937
938 IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
939 this.sessionId = sessionId;
940 this.iterator = iterator;
941 }
942 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700943}