blob: 5a93bef9b1ad0098d6c3f9699dd5996de664219a [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import java.util.Arrays;
20import java.util.Collection;
21import java.util.Comparator;
22import java.util.EnumSet;
23import java.util.HashMap;
24import java.util.LinkedHashMap;
25import java.util.List;
26import java.util.Map;
27import java.util.Set;
28import java.util.TreeSet;
29import java.util.concurrent.atomic.AtomicLong;
30import java.util.function.BiConsumer;
31import java.util.function.BinaryOperator;
32import java.util.function.Function;
33import java.util.function.Supplier;
34import java.util.stream.Collector;
35import java.util.stream.Collectors;
36
37import com.esotericsoftware.kryo.Kryo;
38import com.esotericsoftware.kryo.io.Input;
39import com.esotericsoftware.kryo.io.Output;
40import com.google.common.base.Preconditions;
41import com.google.common.collect.HashMultiset;
42import com.google.common.collect.ImmutableSet;
43import com.google.common.collect.Lists;
44import com.google.common.collect.Maps;
45import com.google.common.collect.Multiset;
46import com.google.common.collect.Sets;
47import io.atomix.protocols.raft.service.AbstractRaftService;
48import io.atomix.protocols.raft.service.Commit;
49import io.atomix.protocols.raft.service.RaftServiceExecutor;
50import io.atomix.protocols.raft.session.RaftSession;
51import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
52import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
53import org.onlab.util.KryoNamespace;
54import org.onlab.util.Match;
55import org.onosproject.store.serializers.KryoNamespaces;
56import org.onosproject.store.service.MultimapEvent;
57import org.onosproject.store.service.Serializer;
58import org.onosproject.store.service.Versioned;
59
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
Jordan Halterman5e884352018-05-21 22:11:07 -070061import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070062import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsEntry;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsKey;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsValue;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ENTRIES;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
Jordan Halterman5e884352018-05-21 22:11:07 -070074import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070075import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
78import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
79import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
Jordan Halterman8c57a092018-06-04 14:53:06 -070080import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070081import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
82import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
83import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
Jordan Halterman8c57a092018-06-04 14:53:06 -070084import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070085import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
86import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
87import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
88import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Replace;
89import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.SIZE;
90import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.VALUES;
91
92/**
93 * State Machine for {@link AtomixConsistentSetMultimap} resource.
94 */
95public class AtomixConsistentSetMultimapService extends AbstractRaftService {
96
97 private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
98 .register(KryoNamespaces.BASIC)
99 .register(AtomixConsistentSetMultimapOperations.NAMESPACE)
100 .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
101 .register(ByteArrayComparator.class)
102 .register(new HashMap().keySet().getClass())
103 .register(TreeSet.class)
104 .register(new com.esotericsoftware.kryo.Serializer<NonTransactionalCommit>() {
105 @Override
106 public void write(Kryo kryo, Output output, NonTransactionalCommit object) {
107 kryo.writeClassAndObject(output, object.valueSet);
108 }
109
110 @Override
111 @SuppressWarnings("unchecked")
112 public NonTransactionalCommit read(Kryo kryo, Input input, Class<NonTransactionalCommit> type) {
113 NonTransactionalCommit commit = new NonTransactionalCommit();
114 commit.valueSet.addAll((Collection<byte[]>) kryo.readClassAndObject(input));
115 return commit;
116 }
117 }, NonTransactionalCommit.class)
118 .build());
119
120 private AtomicLong globalVersion = new AtomicLong(1);
121 private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
122 private Map<String, MapEntryValue> backingMap = Maps.newHashMap();
123
124 @Override
125 public void snapshot(SnapshotWriter writer) {
126 writer.writeLong(globalVersion.get());
127 writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer::encode);
128 writer.writeObject(backingMap, serializer::encode);
129 }
130
131 @Override
132 public void install(SnapshotReader reader) {
133 globalVersion = new AtomicLong(reader.readLong());
134
135 listeners = new LinkedHashMap<>();
136 for (Long sessionId : reader.<Set<Long>>readObject(serializer::decode)) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700137 listeners.put(sessionId, sessions().getSession(sessionId));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700138 }
139
140 backingMap = reader.readObject(serializer::decode);
141 }
142
143 @Override
144 protected void configure(RaftServiceExecutor executor) {
145 executor.register(SIZE, this::size, serializer::encode);
146 executor.register(IS_EMPTY, this::isEmpty, serializer::encode);
147 executor.register(CONTAINS_KEY, serializer::decode, this::containsKey, serializer::encode);
148 executor.register(CONTAINS_VALUE, serializer::decode, this::containsValue, serializer::encode);
149 executor.register(CONTAINS_ENTRY, serializer::decode, this::containsEntry, serializer::encode);
150 executor.register(CLEAR, this::clear);
151 executor.register(KEY_SET, this::keySet, serializer::encode);
152 executor.register(KEYS, this::keys, serializer::encode);
153 executor.register(VALUES, this::values, serializer::encode);
154 executor.register(ENTRIES, this::entries, serializer::encode);
155 executor.register(GET, serializer::decode, this::get, serializer::encode);
156 executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
157 executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
Jordan Halterman8c57a092018-06-04 14:53:06 -0700158 executor.register(REMOVE_AND_GET, serializer::decode, this::removeAndGet, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700159 executor.register(PUT, serializer::decode, this::put, serializer::encode);
Jordan Halterman8c57a092018-06-04 14:53:06 -0700160 executor.register(PUT_AND_GET, serializer::decode, this::putAndGet, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700161 executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
162 executor.register(ADD_LISTENER, this::listen);
163 executor.register(REMOVE_LISTENER, this::unlisten);
Jordan Halterman5e884352018-05-21 22:11:07 -0700164 executor.register(ITERATE, this::iterate, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700165 }
166
167 @Override
168 public void onExpire(RaftSession session) {
169 listeners.remove(session.sessionId().id());
170 }
171
172 @Override
173 public void onClose(RaftSession session) {
174 listeners.remove(session.sessionId().id());
175 }
176
177 /**
178 * Handles a Size commit.
179 *
180 * @param commit Size commit
181 * @return number of unique key value pairs in the multimap
182 */
183 protected int size(Commit<Void> commit) {
184 return backingMap.values()
185 .stream()
186 .map(valueCollection -> valueCollection.values().size())
187 .collect(Collectors.summingInt(size -> size));
188 }
189
190 /**
191 * Handles an IsEmpty commit.
192 *
193 * @param commit IsEmpty commit
194 * @return true if the multimap contains no key-value pairs, else false
195 */
196 protected boolean isEmpty(Commit<Void> commit) {
197 return backingMap.isEmpty();
198 }
199
200 /**
201 * Handles a contains key commit.
202 *
203 * @param commit ContainsKey commit
204 * @return returns true if the key is in the multimap, else false
205 */
206 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
207 return backingMap.containsKey(commit.value().key());
208 }
209
210 /**
211 * Handles a ContainsValue commit.
212 *
213 * @param commit ContainsValue commit
214 * @return true if the value is in the multimap, else false
215 */
216 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
217 if (backingMap.values().isEmpty()) {
218 return false;
219 }
220 Match<byte[]> match = Match.ifValue(commit.value().value());
221 return backingMap
222 .values()
223 .stream()
224 .anyMatch(valueList ->
225 valueList
226 .values()
227 .stream()
228 .anyMatch(byteValue ->
229 match.matches(byteValue)));
230 }
231
232 /**
233 * Handles a ContainsEntry commit.
234 *
235 * @param commit ContainsEntry commit
236 * @return true if the key-value pair exists, else false
237 */
238 protected boolean containsEntry(Commit<? extends ContainsEntry> commit) {
239 MapEntryValue entryValue =
240 backingMap.get(commit.value().key());
241 if (entryValue == null) {
242 return false;
243 } else {
244 Match valueMatch = Match.ifValue(commit.value().value());
245 return entryValue
246 .values()
247 .stream()
248 .anyMatch(byteValue -> valueMatch.matches(byteValue));
249 }
250 }
251
252 /**
253 * Handles a Clear commit.
254 *
255 * @param commit Clear commit
256 */
257 protected void clear(Commit<Void> commit) {
258 backingMap.clear();
259 }
260
261 /**
262 * Handles a KeySet commit.
263 *
264 * @param commit KeySet commit
265 * @return a set of all keys in the multimap
266 */
267 protected Set<String> keySet(Commit<Void> commit) {
268 return ImmutableSet.copyOf(backingMap.keySet());
269 }
270
271 /**
272 * Handles a Keys commit.
273 *
274 * @param commit Keys commit
275 * @return a multiset of keys with each key included an equal number of
276 * times to the total key-value pairs in which that key participates
277 */
278 protected Multiset<String> keys(Commit<Void> commit) {
279 Multiset keys = HashMultiset.create();
280 backingMap.forEach((key, mapEntryValue) -> {
281 keys.add(key, mapEntryValue.values().size());
282 });
283 return keys;
284 }
285
286 /**
287 * Handles a Values commit.
288 *
289 * @param commit Values commit
290 * @return the set of values in the multimap with duplicates included
291 */
292 protected Multiset<byte[]> values(Commit<Void> commit) {
293 return backingMap
294 .values()
295 .stream()
296 .collect(new HashMultisetValueCollector());
297 }
298
299 /**
300 * Handles an Entries commit.
301 *
302 * @param commit Entries commit
303 * @return a set of all key-value pairs in the multimap
304 */
305 protected Collection<Map.Entry<String, byte[]>> entries(Commit<Void> commit) {
306 return backingMap
307 .entrySet()
308 .stream()
309 .collect(new EntrySetCollector());
310 }
311
312 /**
313 * Handles a Get commit.
314 *
315 * @param commit Get commit
316 * @return the collection of values associated with the key or an empty
317 * list if none exist
318 */
319 protected Versioned<Collection<? extends byte[]>> get(Commit<? extends Get> commit) {
320 return toVersioned(backingMap.get(commit.value().key()));
321 }
322
323 /**
324 * Handles a removeAll commit, and returns the previous mapping.
325 *
326 * @param commit removeAll commit
327 * @return collection of removed values
328 */
329 protected Versioned<Collection<? extends byte[]>> removeAll(Commit<? extends RemoveAll> commit) {
330 String key = commit.value().key();
331
332 if (!backingMap.containsKey(key)) {
333 return new Versioned<>(Sets.newHashSet(), -1);
334 }
335
336 Versioned<Collection<? extends byte[]>> removedValues =
337 backingMap.get(key).addCommit(commit);
338 publish(removedValues.value().stream()
339 .map(value -> new MultimapEvent<String, byte[]>(
340 "", key, null, value))
341 .collect(Collectors.toList()));
342 return removedValues;
343 }
344
345 /**
346 * Handles a multiRemove commit, returns true if the remove results in any
347 * change.
348 * @param commit multiRemove commit
349 * @return true if any change results, else false
350 */
351 protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
352 String key = commit.value().key();
353
354 if (!backingMap.containsKey(key)) {
355 return false;
356 }
357
358 Versioned<Collection<? extends byte[]>> removedValues = backingMap
Jordan Halterman8c57a092018-06-04 14:53:06 -0700359 .get(key)
360 .addCommit(commit);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700361
362 if (removedValues != null) {
363 if (removedValues.value().isEmpty()) {
364 backingMap.remove(key);
365 }
366
367 publish(removedValues.value().stream()
Jordan Halterman8c57a092018-06-04 14:53:06 -0700368 .map(value -> new MultimapEvent<String, byte[]>(
369 "", key, null, value))
370 .collect(Collectors.toList()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700371 return true;
372 }
373
374 return false;
375 }
376
377 /**
Jordan Halterman8c57a092018-06-04 14:53:06 -0700378 * Handles a removeAndGet commit.
379 *
380 * @param commit multiRemove commit
381 * @return the updated values or null if the values are empty
382 */
383 protected Versioned<Collection<? extends byte[]>> removeAndGet(Commit<? extends MultiRemove> commit) {
384 String key = commit.value().key();
385
386 if (!backingMap.containsKey(key)) {
387 return null;
388 }
389
390 Versioned<Collection<? extends byte[]>> removedValues = backingMap
391 .get(key)
392 .addCommit(commit);
393
394 if (removedValues != null) {
395 if (removedValues.value().isEmpty()) {
396 backingMap.remove(key);
397 }
398
399 publish(removedValues.value().stream()
400 .map(value -> new MultimapEvent<String, byte[]>(
401 "", key, null, value))
402 .collect(Collectors.toList()));
403 }
404
405 return toVersioned(backingMap.get(key));
406 }
407
408 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700409 * Handles a put commit, returns true if any change results from this
410 * commit.
411 * @param commit a put commit
412 * @return true if this commit results in a change, else false
413 */
414 protected boolean put(Commit<? extends Put> commit) {
415 String key = commit.value().key();
416 if (commit.value().values().isEmpty()) {
417 return false;
418 }
419 if (!backingMap.containsKey(key)) {
420 backingMap.put(key, new NonTransactionalCommit());
421 }
422
423 Versioned<Collection<? extends byte[]>> addedValues = backingMap
Jordan Halterman8c57a092018-06-04 14:53:06 -0700424 .get(key)
425 .addCommit(commit);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700426
427 if (addedValues != null) {
428 publish(addedValues.value().stream()
Jordan Halterman8c57a092018-06-04 14:53:06 -0700429 .map(value -> new MultimapEvent<String, byte[]>(
430 "", key, value, null))
431 .collect(Collectors.toList()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700432 return true;
433 }
434
435 return false;
436 }
437
Jordan Halterman8c57a092018-06-04 14:53:06 -0700438 /**
439 * Handles a putAndGet commit.
440 *
441 * @param commit a put commit
442 * @return the updated values
443 */
444 protected Versioned<Collection<? extends byte[]>> putAndGet(Commit<? extends Put> commit) {
445 String key = commit.value().key();
446 if (commit.value().values().isEmpty()) {
447 return null;
448 }
449 if (!backingMap.containsKey(key)) {
450 backingMap.put(key, new NonTransactionalCommit());
451 }
452
453 Versioned<Collection<? extends byte[]>> addedValues = backingMap
454 .get(key)
455 .addCommit(commit);
456
457 if (addedValues != null) {
458 publish(addedValues.value().stream()
459 .map(value -> new MultimapEvent<String, byte[]>(
460 "", key, value, null))
461 .collect(Collectors.toList()));
462 }
463
464 return toVersioned(backingMap.get(key));
465 }
466
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700467 protected Versioned<Collection<? extends byte[]>> replace(
468 Commit<? extends Replace> commit) {
469 if (!backingMap.containsKey(commit.value().key())) {
470 backingMap.put(commit.value().key(),
471 new NonTransactionalCommit());
472 }
473 return backingMap.get(commit.value().key()).addCommit(commit);
474 }
475
476 /**
477 * Handles a listen commit.
478 *
479 * @param commit listen commit
480 */
481 protected void listen(Commit<Void> commit) {
482 listeners.put(commit.session().sessionId().id(), commit.session());
483 }
484
485 /**
486 * Handles an unlisten commit.
487 *
488 * @param commit unlisten commit
489 */
490 protected void unlisten(Commit<Void> commit) {
491 listeners.remove(commit.session().sessionId().id());
492 }
493
494 /**
Jordan Halterman5e884352018-05-21 22:11:07 -0700495 * Handles an iterate commit.
496 *
497 * @param commit the iterate commit
498 * @return count of commit entries
499 */
500 protected int iterate(Commit<Void> commit) {
501 int count = 0;
502 for (Map.Entry<String, MapEntryValue> entry : backingMap.entrySet()) {
503 for (byte[] value : entry.getValue().values()) {
504 commit.session().publish(ENTRY, serializer::encode, Maps.immutableEntry(entry.getKey(), value));
505 count++;
506 }
507 }
508 return count;
509 }
510
511 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700512 * Publishes events to listeners.
513 *
514 * @param events list of map event to publish
515 */
516 private void publish(List<MultimapEvent<String, byte[]>> events) {
517 listeners.values().forEach(session -> session.publish(CHANGE, serializer::encode, events));
518 }
519
520 private interface MapEntryValue {
521
522 /**
523 * Returns the list of raw {@code byte[]'s}.
524 *
525 * @return list of raw values
526 */
527 Collection<? extends byte[]> values();
528
529 /**
530 * Returns the version of the value.
531 *
532 * @return version
533 */
534 long version();
535
536 /**
537 * Add a new commit and modifies the set of values accordingly.
538 * In the case of a replace or removeAll it returns the set of removed
539 * values. In the case of put or multiRemove it returns null for no
540 * change and a set of the added or removed values respectively if a
541 * change resulted.
542 *
543 * @param commit the commit to be added
544 */
545 Versioned<Collection<? extends byte[]>> addCommit(
546 Commit<? extends MultimapOperation> commit);
547 }
548
549 private class NonTransactionalCommit implements MapEntryValue {
550 private long version;
551 private final TreeSet<byte[]> valueSet = Sets.newTreeSet(new ByteArrayComparator());
552
553 public NonTransactionalCommit() {
554 //Set the version to current it will only be updated once this is
555 // populated
556 this.version = globalVersion.get();
557 }
558
559 @Override
560 public Collection<? extends byte[]> values() {
561 return ImmutableSet.copyOf(valueSet);
562 }
563
564 @Override
565 public long version() {
566 return version;
567 }
568
569 @Override
570 public Versioned<Collection<? extends byte[]>> addCommit(
571 Commit<? extends MultimapOperation> commit) {
572 Preconditions.checkNotNull(commit);
573 Preconditions.checkNotNull(commit.value());
574 Versioned<Collection<? extends byte[]>> retVersion;
575
576 if (commit.value() instanceof Put) {
577 //Using a treeset here sanitizes the input, removing duplicates
578 Set<byte[]> valuesToAdd =
579 Sets.newTreeSet(new ByteArrayComparator());
580 ((Put) commit.value()).values().forEach(value -> {
581 if (!valueSet.contains(value)) {
582 valuesToAdd.add(value);
583 }
584 });
585 if (valuesToAdd.isEmpty()) {
586 //Do not increment or add the commit if no change resulted
587 return null;
588 }
589 retVersion = new Versioned<>(valuesToAdd, version);
590 valuesToAdd.forEach(value -> valueSet.add(value));
591 version++;
592 return retVersion;
593
594 } else if (commit.value() instanceof Replace) {
595 //Will this work?? Need to check before check-in!
596 Set<byte[]> removedValues = Sets.newHashSet();
597 removedValues.addAll(valueSet);
598 retVersion = new Versioned<>(removedValues, version);
599 valueSet.clear();
600 Set<byte[]> valuesToAdd =
601 Sets.newTreeSet(new ByteArrayComparator());
602 ((Replace) commit.value()).values().forEach(value -> {
603 valuesToAdd.add(value);
604 });
605 if (valuesToAdd.isEmpty()) {
606 version = globalVersion.incrementAndGet();
607 backingMap.remove(((Replace) commit.value()).key());
608 return retVersion;
609 }
610 valuesToAdd.forEach(value -> valueSet.add(value));
611 version = globalVersion.incrementAndGet();
612 return retVersion;
613
614 } else if (commit.value() instanceof RemoveAll) {
615 Set<byte[]> removed = Sets.newHashSet();
616 //We can assume here that values only appear once and so we
617 //do not need to sanitize the return for duplicates.
618 removed.addAll(valueSet);
619 retVersion = new Versioned<>(removed, version);
620 valueSet.clear();
621 //In the case of a removeAll all commits will be removed and
622 //unlike the multiRemove case we do not need to consider
623 //dependencies among additive and removal commits.
624
625 //Save the key for use after the commit is closed
626 String key = ((RemoveAll) commit.value()).key();
627 version = globalVersion.incrementAndGet();
628 backingMap.remove(key);
629 return retVersion;
630
631 } else if (commit.value() instanceof MultiRemove) {
632 //Must first calculate how many commits the removal depends on.
633 //At this time we also sanitize the removal set by adding to a
634 //set with proper handling of byte[] equality.
635 Set<byte[]> removed = Sets.newHashSet();
636 ((MultiRemove) commit.value()).values().forEach(value -> {
637 if (valueSet.contains(value)) {
638 removed.add(value);
639 }
640 });
641 //If there is nothing to be removed no action should be taken.
642 if (removed.isEmpty()) {
643 return null;
644 }
645 //Save key in case countdown results in closing the commit.
646 String removedKey = ((MultiRemove) commit.value()).key();
647 removed.forEach(removedValue -> {
648 valueSet.remove(removedValue);
649 });
650 //The version is updated locally as well as globally even if
651 //this object will be removed from the map in case any other
652 //party still holds a reference to this object.
653 retVersion = new Versioned<>(removed, version);
654 version = globalVersion.incrementAndGet();
655 if (valueSet.isEmpty()) {
656 backingMap.remove(removedKey);
657 }
658 return retVersion;
659
660 } else {
661 throw new IllegalArgumentException();
662 }
663 }
664 }
665
666 /**
667 * A collector that creates MapEntryValues and creates a multiset of all
668 * values in the map an equal number of times to the number of sets in
669 * which they participate.
670 */
671 private class HashMultisetValueCollector implements
672 Collector<MapEntryValue,
673 HashMultiset<byte[]>,
674 HashMultiset<byte[]>> {
675
676 @Override
677 public Supplier<HashMultiset<byte[]>> supplier() {
678 return HashMultiset::create;
679 }
680
681 @Override
682 public BiConsumer<HashMultiset<byte[]>, MapEntryValue> accumulator() {
683 return (multiset, mapEntryValue) ->
684 multiset.addAll(mapEntryValue.values());
685 }
686
687 @Override
688 public BinaryOperator<HashMultiset<byte[]>> combiner() {
689 return (setOne, setTwo) -> {
690 setOne.addAll(setTwo);
691 return setOne;
692 };
693 }
694
695 @Override
696 public Function<HashMultiset<byte[]>,
697 HashMultiset<byte[]>> finisher() {
698 return Function.identity();
699 }
700
701 @Override
702 public Set<Characteristics> characteristics() {
703 return EnumSet.of(Characteristics.UNORDERED);
704 }
705 }
706
707 /**
708 * A collector that creates Entries of {@code <String, MapEntryValue>} and
709 * creates a set of entries all key value pairs in the map.
710 */
711 private class EntrySetCollector implements
712 Collector<Map.Entry<String, MapEntryValue>,
713 Set<Map.Entry<String, byte[]>>,
714 Set<Map.Entry<String, byte[]>>> {
715 private Set<Map.Entry<String, byte[]>> set = null;
716
717 @Override
718 public Supplier<Set<Map.Entry<String, byte[]>>> supplier() {
719 return () -> {
720 if (set == null) {
721 set = Sets.newHashSet();
722 }
723 return set;
724 };
725 }
726
727 @Override
728 public BiConsumer<Set<Map.Entry<String, byte[]>>,
729 Map.Entry<String, MapEntryValue>> accumulator() {
730 return (set, entry) -> {
731 entry
732 .getValue()
733 .values()
734 .forEach(byteValue ->
735 set.add(Maps.immutableEntry(entry.getKey(),
736 byteValue)));
737 };
738 }
739
740 @Override
741 public BinaryOperator<Set<Map.Entry<String, byte[]>>> combiner() {
742 return (setOne, setTwo) -> {
743 setOne.addAll(setTwo);
744 return setOne;
745 };
746 }
747
748 @Override
749 public Function<Set<Map.Entry<String, byte[]>>,
750 Set<Map.Entry<String, byte[]>>> finisher() {
751 return (unused) -> set;
752 }
753
754 @Override
755 public Set<Characteristics> characteristics() {
756 return EnumSet.of(Characteristics.UNORDERED);
757 }
758 }
759 /**
760 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
761 * @param value map entry value
762 * @return versioned instance or an empty list versioned -1 if argument is
763 * null
764 */
765 private Versioned<Collection<? extends byte[]>> toVersioned(
766 MapEntryValue value) {
767 return value == null ? new Versioned<>(Lists.newArrayList(), -1) :
768 new Versioned<>(value.values(),
769 value.version());
770 }
771
772 private static class ByteArrayComparator implements Comparator<byte[]> {
773
774 @Override
775 public int compare(byte[] o1, byte[] o2) {
776 if (Arrays.equals(o1, o2)) {
777 return 0;
778 } else {
779 for (int i = 0; i < o1.length && i < o2.length; i++) {
780 if (o1[i] < o2[i]) {
781 return -1;
782 } else if (o1[i] > o2[i]) {
783 return 1;
784 }
785 }
786 return o1.length > o2.length ? 1 : -1;
787 }
788 }
789 }
790}