blob: 2312d8f4b76bcc9b884f235cb536207a3cc31468 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import java.util.Arrays;
20import java.util.Collection;
21import java.util.Comparator;
22import java.util.EnumSet;
23import java.util.HashMap;
24import java.util.LinkedHashMap;
25import java.util.List;
26import java.util.Map;
27import java.util.Set;
28import java.util.TreeSet;
29import java.util.concurrent.atomic.AtomicLong;
30import java.util.function.BiConsumer;
31import java.util.function.BinaryOperator;
32import java.util.function.Function;
33import java.util.function.Supplier;
34import java.util.stream.Collector;
35import java.util.stream.Collectors;
36
37import com.esotericsoftware.kryo.Kryo;
38import com.esotericsoftware.kryo.io.Input;
39import com.esotericsoftware.kryo.io.Output;
40import com.google.common.base.Preconditions;
41import com.google.common.collect.HashMultiset;
42import com.google.common.collect.ImmutableSet;
43import com.google.common.collect.Lists;
44import com.google.common.collect.Maps;
45import com.google.common.collect.Multiset;
46import com.google.common.collect.Sets;
47import io.atomix.protocols.raft.service.AbstractRaftService;
48import io.atomix.protocols.raft.service.Commit;
49import io.atomix.protocols.raft.service.RaftServiceExecutor;
50import io.atomix.protocols.raft.session.RaftSession;
51import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
52import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
53import org.onlab.util.KryoNamespace;
54import org.onlab.util.Match;
55import org.onosproject.store.serializers.KryoNamespaces;
56import org.onosproject.store.service.MultimapEvent;
57import org.onosproject.store.service.Serializer;
58import org.onosproject.store.service.Versioned;
59
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
Jordan Halterman5e884352018-05-21 22:11:07 -070061import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070062import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsEntry;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsKey;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsValue;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ENTRIES;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
Jordan Halterman5e884352018-05-21 22:11:07 -070074import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070075import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
78import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
79import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
80import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
81import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
82import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
83import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
84import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
85import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
86import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Replace;
87import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.SIZE;
88import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.VALUES;
89
90/**
91 * State Machine for {@link AtomixConsistentSetMultimap} resource.
92 */
93public class AtomixConsistentSetMultimapService extends AbstractRaftService {
94
95 private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
96 .register(KryoNamespaces.BASIC)
97 .register(AtomixConsistentSetMultimapOperations.NAMESPACE)
98 .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
99 .register(ByteArrayComparator.class)
100 .register(new HashMap().keySet().getClass())
101 .register(TreeSet.class)
102 .register(new com.esotericsoftware.kryo.Serializer<NonTransactionalCommit>() {
103 @Override
104 public void write(Kryo kryo, Output output, NonTransactionalCommit object) {
105 kryo.writeClassAndObject(output, object.valueSet);
106 }
107
108 @Override
109 @SuppressWarnings("unchecked")
110 public NonTransactionalCommit read(Kryo kryo, Input input, Class<NonTransactionalCommit> type) {
111 NonTransactionalCommit commit = new NonTransactionalCommit();
112 commit.valueSet.addAll((Collection<byte[]>) kryo.readClassAndObject(input));
113 return commit;
114 }
115 }, NonTransactionalCommit.class)
116 .build());
117
118 private AtomicLong globalVersion = new AtomicLong(1);
119 private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
120 private Map<String, MapEntryValue> backingMap = Maps.newHashMap();
121
122 @Override
123 public void snapshot(SnapshotWriter writer) {
124 writer.writeLong(globalVersion.get());
125 writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer::encode);
126 writer.writeObject(backingMap, serializer::encode);
127 }
128
129 @Override
130 public void install(SnapshotReader reader) {
131 globalVersion = new AtomicLong(reader.readLong());
132
133 listeners = new LinkedHashMap<>();
134 for (Long sessionId : reader.<Set<Long>>readObject(serializer::decode)) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700135 listeners.put(sessionId, sessions().getSession(sessionId));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700136 }
137
138 backingMap = reader.readObject(serializer::decode);
139 }
140
141 @Override
142 protected void configure(RaftServiceExecutor executor) {
143 executor.register(SIZE, this::size, serializer::encode);
144 executor.register(IS_EMPTY, this::isEmpty, serializer::encode);
145 executor.register(CONTAINS_KEY, serializer::decode, this::containsKey, serializer::encode);
146 executor.register(CONTAINS_VALUE, serializer::decode, this::containsValue, serializer::encode);
147 executor.register(CONTAINS_ENTRY, serializer::decode, this::containsEntry, serializer::encode);
148 executor.register(CLEAR, this::clear);
149 executor.register(KEY_SET, this::keySet, serializer::encode);
150 executor.register(KEYS, this::keys, serializer::encode);
151 executor.register(VALUES, this::values, serializer::encode);
152 executor.register(ENTRIES, this::entries, serializer::encode);
153 executor.register(GET, serializer::decode, this::get, serializer::encode);
154 executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
155 executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
156 executor.register(PUT, serializer::decode, this::put, serializer::encode);
157 executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
158 executor.register(ADD_LISTENER, this::listen);
159 executor.register(REMOVE_LISTENER, this::unlisten);
Jordan Halterman5e884352018-05-21 22:11:07 -0700160 executor.register(ITERATE, this::iterate, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700161 }
162
163 @Override
164 public void onExpire(RaftSession session) {
165 listeners.remove(session.sessionId().id());
166 }
167
168 @Override
169 public void onClose(RaftSession session) {
170 listeners.remove(session.sessionId().id());
171 }
172
173 /**
174 * Handles a Size commit.
175 *
176 * @param commit Size commit
177 * @return number of unique key value pairs in the multimap
178 */
179 protected int size(Commit<Void> commit) {
180 return backingMap.values()
181 .stream()
182 .map(valueCollection -> valueCollection.values().size())
183 .collect(Collectors.summingInt(size -> size));
184 }
185
186 /**
187 * Handles an IsEmpty commit.
188 *
189 * @param commit IsEmpty commit
190 * @return true if the multimap contains no key-value pairs, else false
191 */
192 protected boolean isEmpty(Commit<Void> commit) {
193 return backingMap.isEmpty();
194 }
195
196 /**
197 * Handles a contains key commit.
198 *
199 * @param commit ContainsKey commit
200 * @return returns true if the key is in the multimap, else false
201 */
202 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
203 return backingMap.containsKey(commit.value().key());
204 }
205
206 /**
207 * Handles a ContainsValue commit.
208 *
209 * @param commit ContainsValue commit
210 * @return true if the value is in the multimap, else false
211 */
212 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
213 if (backingMap.values().isEmpty()) {
214 return false;
215 }
216 Match<byte[]> match = Match.ifValue(commit.value().value());
217 return backingMap
218 .values()
219 .stream()
220 .anyMatch(valueList ->
221 valueList
222 .values()
223 .stream()
224 .anyMatch(byteValue ->
225 match.matches(byteValue)));
226 }
227
228 /**
229 * Handles a ContainsEntry commit.
230 *
231 * @param commit ContainsEntry commit
232 * @return true if the key-value pair exists, else false
233 */
234 protected boolean containsEntry(Commit<? extends ContainsEntry> commit) {
235 MapEntryValue entryValue =
236 backingMap.get(commit.value().key());
237 if (entryValue == null) {
238 return false;
239 } else {
240 Match valueMatch = Match.ifValue(commit.value().value());
241 return entryValue
242 .values()
243 .stream()
244 .anyMatch(byteValue -> valueMatch.matches(byteValue));
245 }
246 }
247
248 /**
249 * Handles a Clear commit.
250 *
251 * @param commit Clear commit
252 */
253 protected void clear(Commit<Void> commit) {
254 backingMap.clear();
255 }
256
257 /**
258 * Handles a KeySet commit.
259 *
260 * @param commit KeySet commit
261 * @return a set of all keys in the multimap
262 */
263 protected Set<String> keySet(Commit<Void> commit) {
264 return ImmutableSet.copyOf(backingMap.keySet());
265 }
266
267 /**
268 * Handles a Keys commit.
269 *
270 * @param commit Keys commit
271 * @return a multiset of keys with each key included an equal number of
272 * times to the total key-value pairs in which that key participates
273 */
274 protected Multiset<String> keys(Commit<Void> commit) {
275 Multiset keys = HashMultiset.create();
276 backingMap.forEach((key, mapEntryValue) -> {
277 keys.add(key, mapEntryValue.values().size());
278 });
279 return keys;
280 }
281
282 /**
283 * Handles a Values commit.
284 *
285 * @param commit Values commit
286 * @return the set of values in the multimap with duplicates included
287 */
288 protected Multiset<byte[]> values(Commit<Void> commit) {
289 return backingMap
290 .values()
291 .stream()
292 .collect(new HashMultisetValueCollector());
293 }
294
295 /**
296 * Handles an Entries commit.
297 *
298 * @param commit Entries commit
299 * @return a set of all key-value pairs in the multimap
300 */
301 protected Collection<Map.Entry<String, byte[]>> entries(Commit<Void> commit) {
302 return backingMap
303 .entrySet()
304 .stream()
305 .collect(new EntrySetCollector());
306 }
307
308 /**
309 * Handles a Get commit.
310 *
311 * @param commit Get commit
312 * @return the collection of values associated with the key or an empty
313 * list if none exist
314 */
315 protected Versioned<Collection<? extends byte[]>> get(Commit<? extends Get> commit) {
316 return toVersioned(backingMap.get(commit.value().key()));
317 }
318
319 /**
320 * Handles a removeAll commit, and returns the previous mapping.
321 *
322 * @param commit removeAll commit
323 * @return collection of removed values
324 */
325 protected Versioned<Collection<? extends byte[]>> removeAll(Commit<? extends RemoveAll> commit) {
326 String key = commit.value().key();
327
328 if (!backingMap.containsKey(key)) {
329 return new Versioned<>(Sets.newHashSet(), -1);
330 }
331
332 Versioned<Collection<? extends byte[]>> removedValues =
333 backingMap.get(key).addCommit(commit);
334 publish(removedValues.value().stream()
335 .map(value -> new MultimapEvent<String, byte[]>(
336 "", key, null, value))
337 .collect(Collectors.toList()));
338 return removedValues;
339 }
340
341 /**
342 * Handles a multiRemove commit, returns true if the remove results in any
343 * change.
344 * @param commit multiRemove commit
345 * @return true if any change results, else false
346 */
347 protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
348 String key = commit.value().key();
349
350 if (!backingMap.containsKey(key)) {
351 return false;
352 }
353
354 Versioned<Collection<? extends byte[]>> removedValues = backingMap
355 .get(key)
356 .addCommit(commit);
357
358 if (removedValues != null) {
359 if (removedValues.value().isEmpty()) {
360 backingMap.remove(key);
361 }
362
363 publish(removedValues.value().stream()
364 .map(value -> new MultimapEvent<String, byte[]>(
365 "", key, null, value))
366 .collect(Collectors.toList()));
367 return true;
368 }
369
370 return false;
371 }
372
373 /**
374 * Handles a put commit, returns true if any change results from this
375 * commit.
376 * @param commit a put commit
377 * @return true if this commit results in a change, else false
378 */
379 protected boolean put(Commit<? extends Put> commit) {
380 String key = commit.value().key();
381 if (commit.value().values().isEmpty()) {
382 return false;
383 }
384 if (!backingMap.containsKey(key)) {
385 backingMap.put(key, new NonTransactionalCommit());
386 }
387
388 Versioned<Collection<? extends byte[]>> addedValues = backingMap
389 .get(key)
390 .addCommit(commit);
391
392 if (addedValues != null) {
393 publish(addedValues.value().stream()
394 .map(value -> new MultimapEvent<String, byte[]>(
395 "", key, value, null))
396 .collect(Collectors.toList()));
397 return true;
398 }
399
400 return false;
401 }
402
403 protected Versioned<Collection<? extends byte[]>> replace(
404 Commit<? extends Replace> commit) {
405 if (!backingMap.containsKey(commit.value().key())) {
406 backingMap.put(commit.value().key(),
407 new NonTransactionalCommit());
408 }
409 return backingMap.get(commit.value().key()).addCommit(commit);
410 }
411
412 /**
413 * Handles a listen commit.
414 *
415 * @param commit listen commit
416 */
417 protected void listen(Commit<Void> commit) {
418 listeners.put(commit.session().sessionId().id(), commit.session());
419 }
420
421 /**
422 * Handles an unlisten commit.
423 *
424 * @param commit unlisten commit
425 */
426 protected void unlisten(Commit<Void> commit) {
427 listeners.remove(commit.session().sessionId().id());
428 }
429
430 /**
Jordan Halterman5e884352018-05-21 22:11:07 -0700431 * Handles an iterate commit.
432 *
433 * @param commit the iterate commit
434 * @return count of commit entries
435 */
436 protected int iterate(Commit<Void> commit) {
437 int count = 0;
438 for (Map.Entry<String, MapEntryValue> entry : backingMap.entrySet()) {
439 for (byte[] value : entry.getValue().values()) {
440 commit.session().publish(ENTRY, serializer::encode, Maps.immutableEntry(entry.getKey(), value));
441 count++;
442 }
443 }
444 return count;
445 }
446
447 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700448 * Publishes events to listeners.
449 *
450 * @param events list of map event to publish
451 */
452 private void publish(List<MultimapEvent<String, byte[]>> events) {
453 listeners.values().forEach(session -> session.publish(CHANGE, serializer::encode, events));
454 }
455
456 private interface MapEntryValue {
457
458 /**
459 * Returns the list of raw {@code byte[]'s}.
460 *
461 * @return list of raw values
462 */
463 Collection<? extends byte[]> values();
464
465 /**
466 * Returns the version of the value.
467 *
468 * @return version
469 */
470 long version();
471
472 /**
473 * Add a new commit and modifies the set of values accordingly.
474 * In the case of a replace or removeAll it returns the set of removed
475 * values. In the case of put or multiRemove it returns null for no
476 * change and a set of the added or removed values respectively if a
477 * change resulted.
478 *
479 * @param commit the commit to be added
480 */
481 Versioned<Collection<? extends byte[]>> addCommit(
482 Commit<? extends MultimapOperation> commit);
483 }
484
485 private class NonTransactionalCommit implements MapEntryValue {
486 private long version;
487 private final TreeSet<byte[]> valueSet = Sets.newTreeSet(new ByteArrayComparator());
488
489 public NonTransactionalCommit() {
490 //Set the version to current it will only be updated once this is
491 // populated
492 this.version = globalVersion.get();
493 }
494
495 @Override
496 public Collection<? extends byte[]> values() {
497 return ImmutableSet.copyOf(valueSet);
498 }
499
500 @Override
501 public long version() {
502 return version;
503 }
504
505 @Override
506 public Versioned<Collection<? extends byte[]>> addCommit(
507 Commit<? extends MultimapOperation> commit) {
508 Preconditions.checkNotNull(commit);
509 Preconditions.checkNotNull(commit.value());
510 Versioned<Collection<? extends byte[]>> retVersion;
511
512 if (commit.value() instanceof Put) {
513 //Using a treeset here sanitizes the input, removing duplicates
514 Set<byte[]> valuesToAdd =
515 Sets.newTreeSet(new ByteArrayComparator());
516 ((Put) commit.value()).values().forEach(value -> {
517 if (!valueSet.contains(value)) {
518 valuesToAdd.add(value);
519 }
520 });
521 if (valuesToAdd.isEmpty()) {
522 //Do not increment or add the commit if no change resulted
523 return null;
524 }
525 retVersion = new Versioned<>(valuesToAdd, version);
526 valuesToAdd.forEach(value -> valueSet.add(value));
527 version++;
528 return retVersion;
529
530 } else if (commit.value() instanceof Replace) {
531 //Will this work?? Need to check before check-in!
532 Set<byte[]> removedValues = Sets.newHashSet();
533 removedValues.addAll(valueSet);
534 retVersion = new Versioned<>(removedValues, version);
535 valueSet.clear();
536 Set<byte[]> valuesToAdd =
537 Sets.newTreeSet(new ByteArrayComparator());
538 ((Replace) commit.value()).values().forEach(value -> {
539 valuesToAdd.add(value);
540 });
541 if (valuesToAdd.isEmpty()) {
542 version = globalVersion.incrementAndGet();
543 backingMap.remove(((Replace) commit.value()).key());
544 return retVersion;
545 }
546 valuesToAdd.forEach(value -> valueSet.add(value));
547 version = globalVersion.incrementAndGet();
548 return retVersion;
549
550 } else if (commit.value() instanceof RemoveAll) {
551 Set<byte[]> removed = Sets.newHashSet();
552 //We can assume here that values only appear once and so we
553 //do not need to sanitize the return for duplicates.
554 removed.addAll(valueSet);
555 retVersion = new Versioned<>(removed, version);
556 valueSet.clear();
557 //In the case of a removeAll all commits will be removed and
558 //unlike the multiRemove case we do not need to consider
559 //dependencies among additive and removal commits.
560
561 //Save the key for use after the commit is closed
562 String key = ((RemoveAll) commit.value()).key();
563 version = globalVersion.incrementAndGet();
564 backingMap.remove(key);
565 return retVersion;
566
567 } else if (commit.value() instanceof MultiRemove) {
568 //Must first calculate how many commits the removal depends on.
569 //At this time we also sanitize the removal set by adding to a
570 //set with proper handling of byte[] equality.
571 Set<byte[]> removed = Sets.newHashSet();
572 ((MultiRemove) commit.value()).values().forEach(value -> {
573 if (valueSet.contains(value)) {
574 removed.add(value);
575 }
576 });
577 //If there is nothing to be removed no action should be taken.
578 if (removed.isEmpty()) {
579 return null;
580 }
581 //Save key in case countdown results in closing the commit.
582 String removedKey = ((MultiRemove) commit.value()).key();
583 removed.forEach(removedValue -> {
584 valueSet.remove(removedValue);
585 });
586 //The version is updated locally as well as globally even if
587 //this object will be removed from the map in case any other
588 //party still holds a reference to this object.
589 retVersion = new Versioned<>(removed, version);
590 version = globalVersion.incrementAndGet();
591 if (valueSet.isEmpty()) {
592 backingMap.remove(removedKey);
593 }
594 return retVersion;
595
596 } else {
597 throw new IllegalArgumentException();
598 }
599 }
600 }
601
602 /**
603 * A collector that creates MapEntryValues and creates a multiset of all
604 * values in the map an equal number of times to the number of sets in
605 * which they participate.
606 */
607 private class HashMultisetValueCollector implements
608 Collector<MapEntryValue,
609 HashMultiset<byte[]>,
610 HashMultiset<byte[]>> {
611
612 @Override
613 public Supplier<HashMultiset<byte[]>> supplier() {
614 return HashMultiset::create;
615 }
616
617 @Override
618 public BiConsumer<HashMultiset<byte[]>, MapEntryValue> accumulator() {
619 return (multiset, mapEntryValue) ->
620 multiset.addAll(mapEntryValue.values());
621 }
622
623 @Override
624 public BinaryOperator<HashMultiset<byte[]>> combiner() {
625 return (setOne, setTwo) -> {
626 setOne.addAll(setTwo);
627 return setOne;
628 };
629 }
630
631 @Override
632 public Function<HashMultiset<byte[]>,
633 HashMultiset<byte[]>> finisher() {
634 return Function.identity();
635 }
636
637 @Override
638 public Set<Characteristics> characteristics() {
639 return EnumSet.of(Characteristics.UNORDERED);
640 }
641 }
642
643 /**
644 * A collector that creates Entries of {@code <String, MapEntryValue>} and
645 * creates a set of entries all key value pairs in the map.
646 */
647 private class EntrySetCollector implements
648 Collector<Map.Entry<String, MapEntryValue>,
649 Set<Map.Entry<String, byte[]>>,
650 Set<Map.Entry<String, byte[]>>> {
651 private Set<Map.Entry<String, byte[]>> set = null;
652
653 @Override
654 public Supplier<Set<Map.Entry<String, byte[]>>> supplier() {
655 return () -> {
656 if (set == null) {
657 set = Sets.newHashSet();
658 }
659 return set;
660 };
661 }
662
663 @Override
664 public BiConsumer<Set<Map.Entry<String, byte[]>>,
665 Map.Entry<String, MapEntryValue>> accumulator() {
666 return (set, entry) -> {
667 entry
668 .getValue()
669 .values()
670 .forEach(byteValue ->
671 set.add(Maps.immutableEntry(entry.getKey(),
672 byteValue)));
673 };
674 }
675
676 @Override
677 public BinaryOperator<Set<Map.Entry<String, byte[]>>> combiner() {
678 return (setOne, setTwo) -> {
679 setOne.addAll(setTwo);
680 return setOne;
681 };
682 }
683
684 @Override
685 public Function<Set<Map.Entry<String, byte[]>>,
686 Set<Map.Entry<String, byte[]>>> finisher() {
687 return (unused) -> set;
688 }
689
690 @Override
691 public Set<Characteristics> characteristics() {
692 return EnumSet.of(Characteristics.UNORDERED);
693 }
694 }
695 /**
696 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
697 * @param value map entry value
698 * @return versioned instance or an empty list versioned -1 if argument is
699 * null
700 */
701 private Versioned<Collection<? extends byte[]>> toVersioned(
702 MapEntryValue value) {
703 return value == null ? new Versioned<>(Lists.newArrayList(), -1) :
704 new Versioned<>(value.values(),
705 value.version());
706 }
707
708 private static class ByteArrayComparator implements Comparator<byte[]> {
709
710 @Override
711 public int compare(byte[] o1, byte[] o2) {
712 if (Arrays.equals(o1, o2)) {
713 return 0;
714 } else {
715 for (int i = 0; i < o1.length && i < o2.length; i++) {
716 if (o1[i] < o2[i]) {
717 return -1;
718 } else if (o1[i] > o2[i]) {
719 return 1;
720 }
721 }
722 return o1.length > o2.length ? 1 : -1;
723 }
724 }
725 }
726}