blob: 3029bd86ebd6df78e304016d279bcea01636f363 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import java.util.Arrays;
20import java.util.Collection;
21import java.util.Comparator;
22import java.util.EnumSet;
23import java.util.HashMap;
24import java.util.LinkedHashMap;
25import java.util.List;
26import java.util.Map;
27import java.util.Set;
28import java.util.TreeSet;
29import java.util.concurrent.atomic.AtomicLong;
30import java.util.function.BiConsumer;
31import java.util.function.BinaryOperator;
32import java.util.function.Function;
33import java.util.function.Supplier;
34import java.util.stream.Collector;
35import java.util.stream.Collectors;
36
37import com.esotericsoftware.kryo.Kryo;
38import com.esotericsoftware.kryo.io.Input;
39import com.esotericsoftware.kryo.io.Output;
40import com.google.common.base.Preconditions;
41import com.google.common.collect.HashMultiset;
42import com.google.common.collect.ImmutableSet;
43import com.google.common.collect.Lists;
44import com.google.common.collect.Maps;
45import com.google.common.collect.Multiset;
46import com.google.common.collect.Sets;
47import io.atomix.protocols.raft.service.AbstractRaftService;
48import io.atomix.protocols.raft.service.Commit;
49import io.atomix.protocols.raft.service.RaftServiceExecutor;
50import io.atomix.protocols.raft.session.RaftSession;
51import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
52import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
53import org.onlab.util.KryoNamespace;
54import org.onlab.util.Match;
55import org.onosproject.store.serializers.KryoNamespaces;
56import org.onosproject.store.service.MultimapEvent;
57import org.onosproject.store.service.Serializer;
58import org.onosproject.store.service.Versioned;
59
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
61import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsEntry;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsKey;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsValue;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ENTRIES;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
78import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
79import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
80import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
81import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
82import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
83import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
84import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Replace;
85import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.SIZE;
86import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.VALUES;
87
88/**
89 * State Machine for {@link AtomixConsistentSetMultimap} resource.
90 */
91public class AtomixConsistentSetMultimapService extends AbstractRaftService {
92
93 private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
94 .register(KryoNamespaces.BASIC)
95 .register(AtomixConsistentSetMultimapOperations.NAMESPACE)
96 .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
97 .register(ByteArrayComparator.class)
98 .register(new HashMap().keySet().getClass())
99 .register(TreeSet.class)
100 .register(new com.esotericsoftware.kryo.Serializer<NonTransactionalCommit>() {
101 @Override
102 public void write(Kryo kryo, Output output, NonTransactionalCommit object) {
103 kryo.writeClassAndObject(output, object.valueSet);
104 }
105
106 @Override
107 @SuppressWarnings("unchecked")
108 public NonTransactionalCommit read(Kryo kryo, Input input, Class<NonTransactionalCommit> type) {
109 NonTransactionalCommit commit = new NonTransactionalCommit();
110 commit.valueSet.addAll((Collection<byte[]>) kryo.readClassAndObject(input));
111 return commit;
112 }
113 }, NonTransactionalCommit.class)
114 .build());
115
116 private AtomicLong globalVersion = new AtomicLong(1);
117 private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
118 private Map<String, MapEntryValue> backingMap = Maps.newHashMap();
119
120 @Override
121 public void snapshot(SnapshotWriter writer) {
122 writer.writeLong(globalVersion.get());
123 writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer::encode);
124 writer.writeObject(backingMap, serializer::encode);
125 }
126
127 @Override
128 public void install(SnapshotReader reader) {
129 globalVersion = new AtomicLong(reader.readLong());
130
131 listeners = new LinkedHashMap<>();
132 for (Long sessionId : reader.<Set<Long>>readObject(serializer::decode)) {
133 listeners.put(sessionId, getSessions().getSession(sessionId));
134 }
135
136 backingMap = reader.readObject(serializer::decode);
137 }
138
139 @Override
140 protected void configure(RaftServiceExecutor executor) {
141 executor.register(SIZE, this::size, serializer::encode);
142 executor.register(IS_EMPTY, this::isEmpty, serializer::encode);
143 executor.register(CONTAINS_KEY, serializer::decode, this::containsKey, serializer::encode);
144 executor.register(CONTAINS_VALUE, serializer::decode, this::containsValue, serializer::encode);
145 executor.register(CONTAINS_ENTRY, serializer::decode, this::containsEntry, serializer::encode);
146 executor.register(CLEAR, this::clear);
147 executor.register(KEY_SET, this::keySet, serializer::encode);
148 executor.register(KEYS, this::keys, serializer::encode);
149 executor.register(VALUES, this::values, serializer::encode);
150 executor.register(ENTRIES, this::entries, serializer::encode);
151 executor.register(GET, serializer::decode, this::get, serializer::encode);
152 executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
153 executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
154 executor.register(PUT, serializer::decode, this::put, serializer::encode);
155 executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
156 executor.register(ADD_LISTENER, this::listen);
157 executor.register(REMOVE_LISTENER, this::unlisten);
158 }
159
160 @Override
161 public void onExpire(RaftSession session) {
162 listeners.remove(session.sessionId().id());
163 }
164
165 @Override
166 public void onClose(RaftSession session) {
167 listeners.remove(session.sessionId().id());
168 }
169
170 /**
171 * Handles a Size commit.
172 *
173 * @param commit Size commit
174 * @return number of unique key value pairs in the multimap
175 */
176 protected int size(Commit<Void> commit) {
177 return backingMap.values()
178 .stream()
179 .map(valueCollection -> valueCollection.values().size())
180 .collect(Collectors.summingInt(size -> size));
181 }
182
183 /**
184 * Handles an IsEmpty commit.
185 *
186 * @param commit IsEmpty commit
187 * @return true if the multimap contains no key-value pairs, else false
188 */
189 protected boolean isEmpty(Commit<Void> commit) {
190 return backingMap.isEmpty();
191 }
192
193 /**
194 * Handles a contains key commit.
195 *
196 * @param commit ContainsKey commit
197 * @return returns true if the key is in the multimap, else false
198 */
199 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
200 return backingMap.containsKey(commit.value().key());
201 }
202
203 /**
204 * Handles a ContainsValue commit.
205 *
206 * @param commit ContainsValue commit
207 * @return true if the value is in the multimap, else false
208 */
209 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
210 if (backingMap.values().isEmpty()) {
211 return false;
212 }
213 Match<byte[]> match = Match.ifValue(commit.value().value());
214 return backingMap
215 .values()
216 .stream()
217 .anyMatch(valueList ->
218 valueList
219 .values()
220 .stream()
221 .anyMatch(byteValue ->
222 match.matches(byteValue)));
223 }
224
225 /**
226 * Handles a ContainsEntry commit.
227 *
228 * @param commit ContainsEntry commit
229 * @return true if the key-value pair exists, else false
230 */
231 protected boolean containsEntry(Commit<? extends ContainsEntry> commit) {
232 MapEntryValue entryValue =
233 backingMap.get(commit.value().key());
234 if (entryValue == null) {
235 return false;
236 } else {
237 Match valueMatch = Match.ifValue(commit.value().value());
238 return entryValue
239 .values()
240 .stream()
241 .anyMatch(byteValue -> valueMatch.matches(byteValue));
242 }
243 }
244
245 /**
246 * Handles a Clear commit.
247 *
248 * @param commit Clear commit
249 */
250 protected void clear(Commit<Void> commit) {
251 backingMap.clear();
252 }
253
254 /**
255 * Handles a KeySet commit.
256 *
257 * @param commit KeySet commit
258 * @return a set of all keys in the multimap
259 */
260 protected Set<String> keySet(Commit<Void> commit) {
261 return ImmutableSet.copyOf(backingMap.keySet());
262 }
263
264 /**
265 * Handles a Keys commit.
266 *
267 * @param commit Keys commit
268 * @return a multiset of keys with each key included an equal number of
269 * times to the total key-value pairs in which that key participates
270 */
271 protected Multiset<String> keys(Commit<Void> commit) {
272 Multiset keys = HashMultiset.create();
273 backingMap.forEach((key, mapEntryValue) -> {
274 keys.add(key, mapEntryValue.values().size());
275 });
276 return keys;
277 }
278
279 /**
280 * Handles a Values commit.
281 *
282 * @param commit Values commit
283 * @return the set of values in the multimap with duplicates included
284 */
285 protected Multiset<byte[]> values(Commit<Void> commit) {
286 return backingMap
287 .values()
288 .stream()
289 .collect(new HashMultisetValueCollector());
290 }
291
292 /**
293 * Handles an Entries commit.
294 *
295 * @param commit Entries commit
296 * @return a set of all key-value pairs in the multimap
297 */
298 protected Collection<Map.Entry<String, byte[]>> entries(Commit<Void> commit) {
299 return backingMap
300 .entrySet()
301 .stream()
302 .collect(new EntrySetCollector());
303 }
304
305 /**
306 * Handles a Get commit.
307 *
308 * @param commit Get commit
309 * @return the collection of values associated with the key or an empty
310 * list if none exist
311 */
312 protected Versioned<Collection<? extends byte[]>> get(Commit<? extends Get> commit) {
313 return toVersioned(backingMap.get(commit.value().key()));
314 }
315
316 /**
317 * Handles a removeAll commit, and returns the previous mapping.
318 *
319 * @param commit removeAll commit
320 * @return collection of removed values
321 */
322 protected Versioned<Collection<? extends byte[]>> removeAll(Commit<? extends RemoveAll> commit) {
323 String key = commit.value().key();
324
325 if (!backingMap.containsKey(key)) {
326 return new Versioned<>(Sets.newHashSet(), -1);
327 }
328
329 Versioned<Collection<? extends byte[]>> removedValues =
330 backingMap.get(key).addCommit(commit);
331 publish(removedValues.value().stream()
332 .map(value -> new MultimapEvent<String, byte[]>(
333 "", key, null, value))
334 .collect(Collectors.toList()));
335 return removedValues;
336 }
337
338 /**
339 * Handles a multiRemove commit, returns true if the remove results in any
340 * change.
341 * @param commit multiRemove commit
342 * @return true if any change results, else false
343 */
344 protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
345 String key = commit.value().key();
346
347 if (!backingMap.containsKey(key)) {
348 return false;
349 }
350
351 Versioned<Collection<? extends byte[]>> removedValues = backingMap
352 .get(key)
353 .addCommit(commit);
354
355 if (removedValues != null) {
356 if (removedValues.value().isEmpty()) {
357 backingMap.remove(key);
358 }
359
360 publish(removedValues.value().stream()
361 .map(value -> new MultimapEvent<String, byte[]>(
362 "", key, null, value))
363 .collect(Collectors.toList()));
364 return true;
365 }
366
367 return false;
368 }
369
370 /**
371 * Handles a put commit, returns true if any change results from this
372 * commit.
373 * @param commit a put commit
374 * @return true if this commit results in a change, else false
375 */
376 protected boolean put(Commit<? extends Put> commit) {
377 String key = commit.value().key();
378 if (commit.value().values().isEmpty()) {
379 return false;
380 }
381 if (!backingMap.containsKey(key)) {
382 backingMap.put(key, new NonTransactionalCommit());
383 }
384
385 Versioned<Collection<? extends byte[]>> addedValues = backingMap
386 .get(key)
387 .addCommit(commit);
388
389 if (addedValues != null) {
390 publish(addedValues.value().stream()
391 .map(value -> new MultimapEvent<String, byte[]>(
392 "", key, value, null))
393 .collect(Collectors.toList()));
394 return true;
395 }
396
397 return false;
398 }
399
400 protected Versioned<Collection<? extends byte[]>> replace(
401 Commit<? extends Replace> commit) {
402 if (!backingMap.containsKey(commit.value().key())) {
403 backingMap.put(commit.value().key(),
404 new NonTransactionalCommit());
405 }
406 return backingMap.get(commit.value().key()).addCommit(commit);
407 }
408
409 /**
410 * Handles a listen commit.
411 *
412 * @param commit listen commit
413 */
414 protected void listen(Commit<Void> commit) {
415 listeners.put(commit.session().sessionId().id(), commit.session());
416 }
417
418 /**
419 * Handles an unlisten commit.
420 *
421 * @param commit unlisten commit
422 */
423 protected void unlisten(Commit<Void> commit) {
424 listeners.remove(commit.session().sessionId().id());
425 }
426
427 /**
428 * Publishes events to listeners.
429 *
430 * @param events list of map event to publish
431 */
432 private void publish(List<MultimapEvent<String, byte[]>> events) {
433 listeners.values().forEach(session -> session.publish(CHANGE, serializer::encode, events));
434 }
435
436 private interface MapEntryValue {
437
438 /**
439 * Returns the list of raw {@code byte[]'s}.
440 *
441 * @return list of raw values
442 */
443 Collection<? extends byte[]> values();
444
445 /**
446 * Returns the version of the value.
447 *
448 * @return version
449 */
450 long version();
451
452 /**
453 * Add a new commit and modifies the set of values accordingly.
454 * In the case of a replace or removeAll it returns the set of removed
455 * values. In the case of put or multiRemove it returns null for no
456 * change and a set of the added or removed values respectively if a
457 * change resulted.
458 *
459 * @param commit the commit to be added
460 */
461 Versioned<Collection<? extends byte[]>> addCommit(
462 Commit<? extends MultimapOperation> commit);
463 }
464
465 private class NonTransactionalCommit implements MapEntryValue {
466 private long version;
467 private final TreeSet<byte[]> valueSet = Sets.newTreeSet(new ByteArrayComparator());
468
469 public NonTransactionalCommit() {
470 //Set the version to current it will only be updated once this is
471 // populated
472 this.version = globalVersion.get();
473 }
474
475 @Override
476 public Collection<? extends byte[]> values() {
477 return ImmutableSet.copyOf(valueSet);
478 }
479
480 @Override
481 public long version() {
482 return version;
483 }
484
485 @Override
486 public Versioned<Collection<? extends byte[]>> addCommit(
487 Commit<? extends MultimapOperation> commit) {
488 Preconditions.checkNotNull(commit);
489 Preconditions.checkNotNull(commit.value());
490 Versioned<Collection<? extends byte[]>> retVersion;
491
492 if (commit.value() instanceof Put) {
493 //Using a treeset here sanitizes the input, removing duplicates
494 Set<byte[]> valuesToAdd =
495 Sets.newTreeSet(new ByteArrayComparator());
496 ((Put) commit.value()).values().forEach(value -> {
497 if (!valueSet.contains(value)) {
498 valuesToAdd.add(value);
499 }
500 });
501 if (valuesToAdd.isEmpty()) {
502 //Do not increment or add the commit if no change resulted
503 return null;
504 }
505 retVersion = new Versioned<>(valuesToAdd, version);
506 valuesToAdd.forEach(value -> valueSet.add(value));
507 version++;
508 return retVersion;
509
510 } else if (commit.value() instanceof Replace) {
511 //Will this work?? Need to check before check-in!
512 Set<byte[]> removedValues = Sets.newHashSet();
513 removedValues.addAll(valueSet);
514 retVersion = new Versioned<>(removedValues, version);
515 valueSet.clear();
516 Set<byte[]> valuesToAdd =
517 Sets.newTreeSet(new ByteArrayComparator());
518 ((Replace) commit.value()).values().forEach(value -> {
519 valuesToAdd.add(value);
520 });
521 if (valuesToAdd.isEmpty()) {
522 version = globalVersion.incrementAndGet();
523 backingMap.remove(((Replace) commit.value()).key());
524 return retVersion;
525 }
526 valuesToAdd.forEach(value -> valueSet.add(value));
527 version = globalVersion.incrementAndGet();
528 return retVersion;
529
530 } else if (commit.value() instanceof RemoveAll) {
531 Set<byte[]> removed = Sets.newHashSet();
532 //We can assume here that values only appear once and so we
533 //do not need to sanitize the return for duplicates.
534 removed.addAll(valueSet);
535 retVersion = new Versioned<>(removed, version);
536 valueSet.clear();
537 //In the case of a removeAll all commits will be removed and
538 //unlike the multiRemove case we do not need to consider
539 //dependencies among additive and removal commits.
540
541 //Save the key for use after the commit is closed
542 String key = ((RemoveAll) commit.value()).key();
543 version = globalVersion.incrementAndGet();
544 backingMap.remove(key);
545 return retVersion;
546
547 } else if (commit.value() instanceof MultiRemove) {
548 //Must first calculate how many commits the removal depends on.
549 //At this time we also sanitize the removal set by adding to a
550 //set with proper handling of byte[] equality.
551 Set<byte[]> removed = Sets.newHashSet();
552 ((MultiRemove) commit.value()).values().forEach(value -> {
553 if (valueSet.contains(value)) {
554 removed.add(value);
555 }
556 });
557 //If there is nothing to be removed no action should be taken.
558 if (removed.isEmpty()) {
559 return null;
560 }
561 //Save key in case countdown results in closing the commit.
562 String removedKey = ((MultiRemove) commit.value()).key();
563 removed.forEach(removedValue -> {
564 valueSet.remove(removedValue);
565 });
566 //The version is updated locally as well as globally even if
567 //this object will be removed from the map in case any other
568 //party still holds a reference to this object.
569 retVersion = new Versioned<>(removed, version);
570 version = globalVersion.incrementAndGet();
571 if (valueSet.isEmpty()) {
572 backingMap.remove(removedKey);
573 }
574 return retVersion;
575
576 } else {
577 throw new IllegalArgumentException();
578 }
579 }
580 }
581
582 /**
583 * A collector that creates MapEntryValues and creates a multiset of all
584 * values in the map an equal number of times to the number of sets in
585 * which they participate.
586 */
587 private class HashMultisetValueCollector implements
588 Collector<MapEntryValue,
589 HashMultiset<byte[]>,
590 HashMultiset<byte[]>> {
591
592 @Override
593 public Supplier<HashMultiset<byte[]>> supplier() {
594 return HashMultiset::create;
595 }
596
597 @Override
598 public BiConsumer<HashMultiset<byte[]>, MapEntryValue> accumulator() {
599 return (multiset, mapEntryValue) ->
600 multiset.addAll(mapEntryValue.values());
601 }
602
603 @Override
604 public BinaryOperator<HashMultiset<byte[]>> combiner() {
605 return (setOne, setTwo) -> {
606 setOne.addAll(setTwo);
607 return setOne;
608 };
609 }
610
611 @Override
612 public Function<HashMultiset<byte[]>,
613 HashMultiset<byte[]>> finisher() {
614 return Function.identity();
615 }
616
617 @Override
618 public Set<Characteristics> characteristics() {
619 return EnumSet.of(Characteristics.UNORDERED);
620 }
621 }
622
623 /**
624 * A collector that creates Entries of {@code <String, MapEntryValue>} and
625 * creates a set of entries all key value pairs in the map.
626 */
627 private class EntrySetCollector implements
628 Collector<Map.Entry<String, MapEntryValue>,
629 Set<Map.Entry<String, byte[]>>,
630 Set<Map.Entry<String, byte[]>>> {
631 private Set<Map.Entry<String, byte[]>> set = null;
632
633 @Override
634 public Supplier<Set<Map.Entry<String, byte[]>>> supplier() {
635 return () -> {
636 if (set == null) {
637 set = Sets.newHashSet();
638 }
639 return set;
640 };
641 }
642
643 @Override
644 public BiConsumer<Set<Map.Entry<String, byte[]>>,
645 Map.Entry<String, MapEntryValue>> accumulator() {
646 return (set, entry) -> {
647 entry
648 .getValue()
649 .values()
650 .forEach(byteValue ->
651 set.add(Maps.immutableEntry(entry.getKey(),
652 byteValue)));
653 };
654 }
655
656 @Override
657 public BinaryOperator<Set<Map.Entry<String, byte[]>>> combiner() {
658 return (setOne, setTwo) -> {
659 setOne.addAll(setTwo);
660 return setOne;
661 };
662 }
663
664 @Override
665 public Function<Set<Map.Entry<String, byte[]>>,
666 Set<Map.Entry<String, byte[]>>> finisher() {
667 return (unused) -> set;
668 }
669
670 @Override
671 public Set<Characteristics> characteristics() {
672 return EnumSet.of(Characteristics.UNORDERED);
673 }
674 }
675 /**
676 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
677 * @param value map entry value
678 * @return versioned instance or an empty list versioned -1 if argument is
679 * null
680 */
681 private Versioned<Collection<? extends byte[]>> toVersioned(
682 MapEntryValue value) {
683 return value == null ? new Versioned<>(Lists.newArrayList(), -1) :
684 new Versioned<>(value.values(),
685 value.version());
686 }
687
688 private static class ByteArrayComparator implements Comparator<byte[]> {
689
690 @Override
691 public int compare(byte[] o1, byte[] o2) {
692 if (Arrays.equals(o1, o2)) {
693 return 0;
694 } else {
695 for (int i = 0; i < o1.length && i < o2.length; i++) {
696 if (o1[i] < o2[i]) {
697 return -1;
698 } else if (o1[i] > o2[i]) {
699 return 1;
700 }
701 }
702 return o1.length > o2.length ? 1 : -1;
703 }
704 }
705 }
706}