blob: 636e261bcbd8eda1111acb04a21db7730c56ead3 [file] [log] [blame]
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -07001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import com.google.common.base.Preconditions;
20import com.google.common.collect.HashMultimap;
21import com.google.common.collect.HashMultiset;
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070022import com.google.common.collect.ImmutableSet;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070023import com.google.common.collect.Lists;
24import com.google.common.collect.Maps;
25import com.google.common.collect.Multiset;
26import com.google.common.collect.Sets;
27import io.atomix.copycat.server.Commit;
28import io.atomix.copycat.server.Snapshottable;
29import io.atomix.copycat.server.StateMachineExecutor;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070030import io.atomix.copycat.server.session.SessionListener;
31import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
32import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
33import io.atomix.resource.ResourceStateMachine;
34import org.onlab.util.CountDownCompleter;
35import org.onlab.util.Match;
36import org.onosproject.store.service.Versioned;
37import org.slf4j.Logger;
38
39import java.util.Arrays;
40import java.util.Collection;
41import java.util.Comparator;
42import java.util.EnumSet;
43import java.util.Map;
44import java.util.Properties;
45import java.util.Set;
46import java.util.TreeMap;
47import java.util.concurrent.atomic.AtomicLong;
48import java.util.function.BiConsumer;
49import java.util.function.BinaryOperator;
50import java.util.function.Function;
51import java.util.function.Supplier;
52import java.util.stream.Collector;
53import java.util.stream.Collectors;
54
Aaron Kruglikova1801aa2016-07-12 15:17:30 -070055import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Clear;
56import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.ContainsEntry;
57import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.ContainsKey;
58import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.ContainsValue;
59import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Entries;
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Get;
61import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultimapCommand;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070071import static org.slf4j.LoggerFactory.getLogger;
72
73/**
Aaron Kruglikova1801aa2016-07-12 15:17:30 -070074 * State Machine for {@link AtomixConsistentSetMultimap} resource.
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070075 */
Aaron Kruglikova1801aa2016-07-12 15:17:30 -070076public class AtomixConsistentSetMultimapState extends ResourceStateMachine
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070077 implements SessionListener, Snapshottable {
78
79 private final Logger log = getLogger(getClass());
80 private final AtomicLong globalVersion = new AtomicLong(1);
81 //TODO Add listener map here
82 private final Map<String, MapEntryValue> backingMap = Maps.newHashMap();
83
Aaron Kruglikova1801aa2016-07-12 15:17:30 -070084 public AtomixConsistentSetMultimapState(Properties properties) {
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070085 super(properties);
86 }
87
88 @Override
89 public void snapshot(SnapshotWriter writer) {
90 }
91
92 @Override
93 public void install(SnapshotReader reader) {
94 }
95
96 @Override
97 protected void configure(StateMachineExecutor executor) {
98 executor.register(Size.class, this::size);
99 executor.register(IsEmpty.class, this::isEmpty);
100 executor.register(ContainsKey.class, this::containsKey);
101 executor.register(ContainsValue.class, this::containsValue);
102 executor.register(ContainsEntry.class, this::containsEntry);
103 executor.register(Clear.class, this::clear);
104 executor.register(KeySet.class, this::keySet);
105 executor.register(Keys.class, this::keys);
106 executor.register(Values.class, this::values);
107 executor.register(Entries.class, this::entries);
108 executor.register(Get.class, this::get);
109 executor.register(RemoveAll.class, this::removeAll);
110 executor.register(MultiRemove.class, this::multiRemove);
111 executor.register(Put.class, this::put);
112 executor.register(Replace.class, this::replace);
113 }
114
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700115 /**
116 * Handles a Size commit.
117 *
118 * @param commit Size commit
119 * @return number of unique key value pairs in the multimap
120 */
121 protected int size(Commit<? extends Size> commit) {
122 try {
123 return backingMap.values()
124 .stream()
125 .map(valueCollection -> valueCollection.values().size())
126 .collect(Collectors.summingInt(size -> size));
127 } finally {
128 commit.close();
129 }
130 }
131
132 /**
133 * Handles an IsEmpty commit.
134 *
135 * @param commit IsEmpty commit
136 * @return true if the multimap contains no key-value pairs, else false
137 */
138 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
139 try {
140 return backingMap.isEmpty();
141 } finally {
142 commit.close();
143 }
144 }
145
146 /**
147 * Handles a contains key commit.
148 *
149 * @param commit ContainsKey commit
150 * @return returns true if the key is in the multimap, else false
151 */
152 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
153 try {
154 return backingMap.containsKey(commit.operation().key());
155 } finally {
156 commit.close();
157 }
158 }
159
160 /**
161 * Handles a ContainsValue commit.
162 *
163 * @param commit ContainsValue commit
164 * @return true if the value is in the multimap, else false
165 */
166 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
167 try {
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700168 if (backingMap.values().isEmpty()) {
169 return false;
170 }
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700171 Match<byte[]> match = Match.ifValue(commit.operation().value());
172 return backingMap
173 .values()
174 .stream()
175 .anyMatch(valueList ->
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700176 valueList
177 .values()
178 .stream()
179 .anyMatch(byteValue ->
180 match.matches(byteValue)));
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700181 } finally {
182 commit.close();
183 }
184 }
185
186 /**
187 * Handles a ContainsEntry commit.
188 *
189 * @param commit ContainsEntry commit
190 * @return true if the key-value pair exists, else false
191 */
192 protected boolean containsEntry(Commit<? extends ContainsEntry> commit) {
193 try {
194 MapEntryValue entryValue =
195 backingMap.get(commit.operation().key());
196 if (entryValue == null) {
197 return false;
198 } else {
199 Match valueMatch = Match.ifValue(commit.operation().value());
200 return entryValue
201 .values()
202 .stream()
203 .anyMatch(byteValue -> valueMatch.matches(byteValue));
204 }
205 } finally {
206 commit.close();
207 }
208 }
209
210 /**
211 * Handles a Clear commit.
212 *
213 * @param commit Clear commit
214 */
215 protected void clear(Commit<? extends Clear> commit) {
216 try {
217 backingMap.clear();
218 } finally {
219 commit.close();
220 }
221 }
222
223 /**
224 * Handles a KeySet commit.
225 *
226 * @param commit KeySet commit
227 * @return a set of all keys in the multimap
228 */
229 protected Set<String> keySet(Commit<? extends KeySet> commit) {
230 try {
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700231 return ImmutableSet.copyOf(backingMap.keySet());
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700232 } finally {
233 commit.close();
234 }
235 }
236
237 /**
238 * Handles a Keys commit.
239 *
240 * @param commit Keys commit
241 * @return a multiset of keys with each key included an equal number of
242 * times to the total key-value pairs in which that key participates
243 */
244 protected Multiset<String> keys(Commit<? extends Keys> commit) {
245 try {
246 Multiset keys = HashMultiset.create();
247 backingMap.forEach((key, mapEntryValue) -> {
248 keys.add(key, mapEntryValue.values().size());
249 });
250 return keys;
251 } finally {
252 commit.close();
253 }
254 }
255
256 /**
257 * Handles a Values commit.
258 *
259 * @param commit Values commit
260 * @return the set of values in the multimap with duplicates included
261 */
262 protected Multiset<byte[]> values(Commit<? extends Values> commit) {
263 try {
264 return backingMap
265 .values()
266 .stream()
267 .collect(new HashMultisetValueCollector());
268 } finally {
269 commit.close();
270 }
271 }
272
273 /**
274 * Handles an Entries commit.
275 *
276 * @param commit Entries commit
277 * @return a set of all key-value pairs in the multimap
278 */
279 protected Collection<Map.Entry<String, byte[]>> entries(
280 Commit<? extends Entries> commit) {
281 try {
282 return backingMap
283 .entrySet()
284 .stream()
285 .collect(new EntrySetCollector());
286 } finally {
287 commit.close();
288 }
289 }
290
291 /**
292 * Handles a Get commit.
293 *
294 * @param commit Get commit
295 * @return the collection of values associated with the key or an empty
296 * list if none exist
297 */
298 protected Versioned<Collection<? extends byte[]>> get(
299 Commit<? extends Get> commit) {
300 try {
301 MapEntryValue mapEntryValue = backingMap.get(commit.operation().key());
302 return toVersioned(backingMap.get(commit.operation().key()));
303 } finally {
304 commit.close();
305 }
306 }
307
308 /**
309 * Handles a removeAll commit, and returns the previous mapping.
310 *
311 * @param commit removeAll commit
312 * @return collection of removed values
313 */
314 protected Versioned<Collection<? extends byte[]>> removeAll(
315 Commit<? extends RemoveAll> commit) {
316 if (!backingMap.containsKey(commit.operation().key())) {
317 commit.close();
318 return new Versioned<>(Sets.newHashSet(), -1);
319 } else {
320 return backingMap.get(commit.operation().key()).addCommit(commit);
321 }
322 }
323
324 /**
325 * Handles a multiRemove commit, returns true if the remove results in any
326 * change.
327 * @param commit multiRemove commit
328 * @return true if any change results, else false
329 */
330 protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
331 if (!backingMap.containsKey(commit.operation().key())) {
332 commit.close();
333 return false;
334 } else {
335 return (backingMap
336 .get(commit.operation().key())
337 .addCommit(commit)) != null;
338 }
339 }
340
341 /**
342 * Handles a put commit, returns true if any change results from this
343 * commit.
344 * @param commit a put commit
345 * @return true if this commit results in a change, else false
346 */
347 protected boolean put(Commit<? extends Put> commit) {
348 if (commit.operation().values().isEmpty()) {
349 return false;
350 }
351 if (!backingMap.containsKey(commit.operation().key())) {
352 backingMap.put(commit.operation().key(),
353 new NonTransactionalCommit(1));
354 }
355 return backingMap
356 .get(commit.operation().key())
357 .addCommit(commit) != null;
358 }
359
360 protected Versioned<Collection<? extends byte[]>> replace(
361 Commit<? extends Replace> commit) {
362 if (!backingMap.containsKey(commit.operation().key())) {
363 backingMap.put(commit.operation().key(),
364 new NonTransactionalCommit(1));
365 }
366 return backingMap.get(commit.operation().key()).addCommit(commit);
367 }
368
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700369 private interface MapEntryValue {
370
371 /**
372 * Returns the list of raw {@code byte[]'s}.
373 *
374 * @return list of raw values
375 */
376 Collection<? extends byte[]> values();
377
378 /**
379 * Returns the version of the value.
380 *
381 * @return version
382 */
383 long version();
384
385 /**
386 * Discards the value by invoke appropriate clean up actions.
387 */
388 void discard();
389
390 /**
391 * Add a new commit and modifies the set of values accordingly.
392 * In the case of a replace or removeAll it returns the set of removed
393 * values. In the case of put or multiRemove it returns null for no
394 * change and a set of the added or removed values respectively if a
395 * change resulted.
396 *
397 * @param commit the commit to be added
398 */
399 Versioned<Collection<? extends byte[]>> addCommit(
400 Commit<? extends MultimapCommand> commit);
401 }
402
403 private class NonTransactionalCommit implements MapEntryValue {
404 private long version;
405 private final TreeMap<byte[], CountDownCompleter<Commit>>
406 valueCountdownMap = Maps.newTreeMap(new ByteArrayComparator());
407 /*This is a mapping of commits that added values to the commits
408 * removing those values, they will not be circular because keys will
409 * be exclusively Put and Replace commits and values will be exclusively
410 * Multiremove commits, each time a Put or replace is removed it should
411 * as part of closing go through and countdown each of the remove
412 * commits depending on it.*/
413 private final HashMultimap<Commit, CountDownCompleter<Commit>>
414 additiveToRemovalCommits = HashMultimap.create();
415
416 public NonTransactionalCommit(
417 long version) {
418 //Set the version to current it will only be updated once this is
419 // populated
420 this.version = globalVersion.get();
421 }
422
423 @Override
424 public Collection<? extends byte[]> values() {
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700425 return ImmutableSet.copyOf(valueCountdownMap.keySet());
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700426 }
427
428 @Override
429 public long version() {
430 return version;
431 }
432
433 @Override
434 public void discard() {
435 valueCountdownMap.values().forEach(completer ->
436 completer.object().close());
437 }
438
439 @Override
440 public Versioned<Collection<? extends byte[]>> addCommit(
441 Commit<? extends MultimapCommand> commit) {
442 Preconditions.checkNotNull(commit);
443 Preconditions.checkNotNull(commit.operation());
444 Versioned<Collection<? extends byte[]>> retVersion;
445
446 if (commit.operation() instanceof Put) {
447 //Using a treeset here sanitizes the input, removing duplicates
448 Set<byte[]> valuesToAdd =
449 Sets.newTreeSet(new ByteArrayComparator());
450 ((Put) commit.operation()).values().forEach(value -> {
451 if (!valueCountdownMap.containsKey(value)) {
452 valuesToAdd.add(value);
453 }
454 });
455 if (valuesToAdd.isEmpty()) {
456 //Do not increment or add the commit if no change resulted
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700457 commit.close();
458 return null;
459 }
460 //When all values from a commit have been removed decrement all
461 //removal commits relying on it and remove itself from the
462 //mapping of additive commits to the commits removing the
463 //values it added. (Only multiremoves will be dependent)
464 CountDownCompleter<Commit> completer =
465 new CountDownCompleter<>(commit, valuesToAdd.size(),
466 c -> {
467 if (additiveToRemovalCommits.containsKey(c)) {
468 additiveToRemovalCommits.
469 get(c).
470 forEach(countdown ->
471 countdown.countDown());
472 additiveToRemovalCommits.removeAll(c);
473 }
474 c.close();
475 });
476 retVersion = new Versioned<>(valuesToAdd, version);
477 valuesToAdd.forEach(value -> valueCountdownMap.put(value,
478 completer));
479 version++;
480 return retVersion;
481
482 } else if (commit.operation() instanceof Replace) {
483 //Will this work?? Need to check before check-in!
484 Set<byte[]> removedValues = Sets.newHashSet();
485 removedValues.addAll(valueCountdownMap.keySet());
486 retVersion = new Versioned<>(removedValues, version);
487 valueCountdownMap.values().forEach(countdown ->
488 countdown.countDown());
489 valueCountdownMap.clear();
490 Set<byte[]> valuesToAdd =
491 Sets.newTreeSet(new ByteArrayComparator());
492 ((Replace) commit.operation()).values().forEach(value -> {
493 valuesToAdd.add(value);
494 });
495 if (valuesToAdd.isEmpty()) {
496 version = globalVersion.incrementAndGet();
497 backingMap.remove(((Replace) commit.operation()).key());
498 //Order is important here, the commit must be closed last
499 //(or minimally after all uses)
500 commit.close();
501 return retVersion;
502 }
503 CountDownCompleter<Commit> completer =
504 new CountDownCompleter<>(commit, valuesToAdd.size(),
505 c -> {
506 if (additiveToRemovalCommits
507 .containsKey(c)) {
508 additiveToRemovalCommits.
509 get(c).
510 forEach(countdown ->
511 countdown.countDown());
512 additiveToRemovalCommits.
513 removeAll(c);
514 }
515 c.close();
516 });
517 valuesToAdd.forEach(value ->
518 valueCountdownMap.put(value, completer));
519 version = globalVersion.incrementAndGet();
520 return retVersion;
521
522 } else if (commit.operation() instanceof RemoveAll) {
523 Set<byte[]> removed = Sets.newHashSet();
524 //We can assume here that values only appear once and so we
525 //do not need to sanitize the return for duplicates.
526 removed.addAll(valueCountdownMap.keySet());
527 retVersion = new Versioned<>(removed, version);
528 valueCountdownMap.values().forEach(countdown ->
529 countdown.countDown());
530 valueCountdownMap.clear();
531 //In the case of a removeAll all commits will be removed and
532 //unlike the multiRemove case we do not need to consider
533 //dependencies among additive and removal commits.
534
535 //Save the key for use after the commit is closed
536 String key = ((RemoveAll) commit.operation()).key();
537 commit.close();
538 version = globalVersion.incrementAndGet();
539 backingMap.remove(key);
540 return retVersion;
541
542 } else if (commit.operation() instanceof MultiRemove) {
543 //Must first calculate how many commits the removal depends on.
544 //At this time we also sanitize the removal set by adding to a
545 //set with proper handling of byte[] equality.
546 Set<byte[]> removed = Sets.newHashSet();
547 Set<Commit> commitsRemovedFrom = Sets.newHashSet();
548 ((MultiRemove) commit.operation()).values().forEach(value -> {
549 if (valueCountdownMap.containsKey(value)) {
550 removed.add(value);
551 commitsRemovedFrom
552 .add(valueCountdownMap.get(value).object());
553 }
554 });
555 //If there is nothing to be removed no action should be taken.
556 if (removed.isEmpty()) {
557 //Do not increment or add the commit if no change resulted
558 commit.close();
559 return null;
560 }
561 //When all additive commits this depends on are closed this can
562 //be closed as well.
563 CountDownCompleter<Commit> completer =
564 new CountDownCompleter<>(commit,
565 commitsRemovedFrom.size(),
566 c -> c.close());
567 commitsRemovedFrom.forEach(commitRemovedFrom -> {
568 additiveToRemovalCommits.put(commitRemovedFrom, completer);
569 });
570 //Save key in case countdown results in closing the commit.
571 String removedKey = ((MultiRemove) commit.operation()).key();
572 removed.forEach(removedValue -> {
573 valueCountdownMap.remove(removedValue).countDown();
574 });
575 //The version is updated locally as well as globally even if
576 //this object will be removed from the map in case any other
577 //party still holds a reference to this object.
578 retVersion = new Versioned<>(removed, version);
579 version = globalVersion.incrementAndGet();
580 if (valueCountdownMap.isEmpty()) {
581 backingMap
582 .remove(removedKey);
583 }
584 return retVersion;
585
586 } else {
587 throw new IllegalArgumentException();
588 }
589 }
590 }
591
592 /**
593 * A collector that creates MapEntryValues and creates a multiset of all
594 * values in the map an equal number of times to the number of sets in
595 * which they participate.
596 */
597 private class HashMultisetValueCollector implements
598 Collector<MapEntryValue,
599 HashMultiset<byte[]>,
600 HashMultiset<byte[]>> {
601 private HashMultiset<byte[]> multiset = null;
602
603 @Override
604 public Supplier<HashMultiset<byte[]>> supplier() {
Sho SHIMIZU99e90cd2016-08-12 15:55:16 -0700605 return () -> {
606 if (multiset == null) {
607 multiset = HashMultiset.create();
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700608 }
Sho SHIMIZU99e90cd2016-08-12 15:55:16 -0700609 return multiset;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700610 };
611 }
612
613 @Override
614 public BiConsumer<HashMultiset<byte[]>, MapEntryValue> accumulator() {
615 return (multiset, mapEntryValue) ->
616 multiset.addAll(mapEntryValue.values());
617 }
618
619 @Override
620 public BinaryOperator<HashMultiset<byte[]>> combiner() {
621 return (setOne, setTwo) -> {
622 setOne.addAll(setTwo);
623 return setOne;
624 };
625 }
626
627 @Override
628 public Function<HashMultiset<byte[]>,
629 HashMultiset<byte[]>> finisher() {
630 return (unused) -> multiset;
631 }
632
633 @Override
634 public Set<Characteristics> characteristics() {
635 return EnumSet.of(Characteristics.UNORDERED);
636 }
637 }
638
639 /**
640 * A collector that creates Entries of {@code <String, MapEntryValue>} and
641 * creates a set of entries all key value pairs in the map.
642 */
643 private class EntrySetCollector implements
644 Collector<Map.Entry<String, MapEntryValue>,
645 Set<Map.Entry<String, byte[]>>,
646 Set<Map.Entry<String, byte[]>>> {
647 private Set<Map.Entry<String, byte[]>> set = null;
648
649 @Override
650 public Supplier<Set<Map.Entry<String, byte[]>>> supplier() {
Sho SHIMIZU99e90cd2016-08-12 15:55:16 -0700651 return () -> {
652 if (set == null) {
653 set = Sets.newHashSet();
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700654 }
Sho SHIMIZU99e90cd2016-08-12 15:55:16 -0700655 return set;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700656 };
657 }
658
659 @Override
660 public BiConsumer<Set<Map.Entry<String, byte[]>>,
661 Map.Entry<String, MapEntryValue>> accumulator() {
662 return (set, entry) -> {
663 entry
664 .getValue()
665 .values()
666 .forEach(byteValue ->
667 set.add(Maps.immutableEntry(entry.getKey(),
668 byteValue)));
669 };
670 }
671
672 @Override
673 public BinaryOperator<Set<Map.Entry<String, byte[]>>> combiner() {
674 return (setOne, setTwo) -> {
675 setOne.addAll(setTwo);
676 return setOne;
677 };
678 }
679
680 @Override
681 public Function<Set<Map.Entry<String, byte[]>>,
682 Set<Map.Entry<String, byte[]>>> finisher() {
683 return (unused) -> set;
684 }
685
686 @Override
687 public Set<Characteristics> characteristics() {
688 return EnumSet.of(Characteristics.UNORDERED);
689 }
690 }
691 /**
692 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
693 * @param value map entry value
694 * @return versioned instance or an empty list versioned -1 if argument is
695 * null
696 */
697 private Versioned<Collection<? extends byte[]>> toVersioned(
698 MapEntryValue value) {
699 return value == null ? new Versioned<>(Lists.newArrayList(), -1) :
700 new Versioned<>(value.values(),
701 value.version());
702 }
703
704 private class ByteArrayComparator implements Comparator<byte[]> {
705
706 @Override
707 public int compare(byte[] o1, byte[] o2) {
708 if (Arrays.equals(o1, o2)) {
709 return 0;
710 } else {
711 for (int i = 0; i < o1.length && i < o2.length; i++) {
712 if (o1[i] < o2[i]) {
713 return -1;
714 } else if (o1[i] > o2[i]) {
715 return 1;
716 }
717 }
718 return o1.length > o2.length ? 1 : -1;
719 }
720 }
721 }
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700722 }