blob: 47972d17c1ccabad09583ebd984e7075d83b4b57 [file] [log] [blame]
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -07001/*
Brian O'Connor0a4e6742016-09-15 23:03:10 -07002 * Copyright 2016-present Open Networking Laboratory
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import com.google.common.base.Preconditions;
20import com.google.common.collect.HashMultimap;
21import com.google.common.collect.HashMultiset;
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070022import com.google.common.collect.ImmutableSet;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070023import com.google.common.collect.Lists;
24import com.google.common.collect.Maps;
25import com.google.common.collect.Multiset;
26import com.google.common.collect.Sets;
27import io.atomix.copycat.server.Commit;
28import io.atomix.copycat.server.Snapshottable;
29import io.atomix.copycat.server.StateMachineExecutor;
Jonathan Hart46bf89b2017-02-27 15:56:42 -080030import io.atomix.copycat.server.session.ServerSession;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070031import io.atomix.copycat.server.session.SessionListener;
32import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
33import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
34import io.atomix.resource.ResourceStateMachine;
35import org.onlab.util.CountDownCompleter;
36import org.onlab.util.Match;
Jonathan Hart46bf89b2017-02-27 15:56:42 -080037import org.onosproject.store.service.MultimapEvent;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070038import org.onosproject.store.service.Versioned;
39import org.slf4j.Logger;
40
41import java.util.Arrays;
42import java.util.Collection;
43import java.util.Comparator;
44import java.util.EnumSet;
Jonathan Hart46bf89b2017-02-27 15:56:42 -080045import java.util.HashMap;
46import java.util.List;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070047import java.util.Map;
48import java.util.Properties;
49import java.util.Set;
50import java.util.TreeMap;
51import java.util.concurrent.atomic.AtomicLong;
52import java.util.function.BiConsumer;
53import java.util.function.BinaryOperator;
54import java.util.function.Function;
55import java.util.function.Supplier;
56import java.util.stream.Collector;
57import java.util.stream.Collectors;
58
Aaron Kruglikova1801aa2016-07-12 15:17:30 -070059import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Clear;
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.ContainsEntry;
61import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.ContainsKey;
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.ContainsValue;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Entries;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Get;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
Jonathan Hart46bf89b2017-02-27 15:56:42 -080068import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Listen;
Aaron Kruglikova1801aa2016-07-12 15:17:30 -070069import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultimapCommand;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
Jonathan Hart46bf89b2017-02-27 15:56:42 -080075import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Unlisten;
Aaron Kruglikova1801aa2016-07-12 15:17:30 -070076import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070077import static org.slf4j.LoggerFactory.getLogger;
78
79/**
Aaron Kruglikova1801aa2016-07-12 15:17:30 -070080 * State Machine for {@link AtomixConsistentSetMultimap} resource.
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070081 */
Aaron Kruglikova1801aa2016-07-12 15:17:30 -070082public class AtomixConsistentSetMultimapState extends ResourceStateMachine
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070083 implements SessionListener, Snapshottable {
84
85 private final Logger log = getLogger(getClass());
86 private final AtomicLong globalVersion = new AtomicLong(1);
Jonathan Hart46bf89b2017-02-27 15:56:42 -080087 private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070088 private final Map<String, MapEntryValue> backingMap = Maps.newHashMap();
89
Aaron Kruglikova1801aa2016-07-12 15:17:30 -070090 public AtomixConsistentSetMultimapState(Properties properties) {
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -070091 super(properties);
92 }
93
94 @Override
95 public void snapshot(SnapshotWriter writer) {
96 }
97
98 @Override
99 public void install(SnapshotReader reader) {
100 }
101
102 @Override
103 protected void configure(StateMachineExecutor executor) {
104 executor.register(Size.class, this::size);
105 executor.register(IsEmpty.class, this::isEmpty);
106 executor.register(ContainsKey.class, this::containsKey);
107 executor.register(ContainsValue.class, this::containsValue);
108 executor.register(ContainsEntry.class, this::containsEntry);
109 executor.register(Clear.class, this::clear);
110 executor.register(KeySet.class, this::keySet);
111 executor.register(Keys.class, this::keys);
112 executor.register(Values.class, this::values);
113 executor.register(Entries.class, this::entries);
114 executor.register(Get.class, this::get);
115 executor.register(RemoveAll.class, this::removeAll);
116 executor.register(MultiRemove.class, this::multiRemove);
117 executor.register(Put.class, this::put);
118 executor.register(Replace.class, this::replace);
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800119 executor.register(Listen.class, this::listen);
120 executor.register(Unlisten.class, this::unlisten);
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700121 }
122
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700123 /**
124 * Handles a Size commit.
125 *
126 * @param commit Size commit
127 * @return number of unique key value pairs in the multimap
128 */
129 protected int size(Commit<? extends Size> commit) {
130 try {
131 return backingMap.values()
132 .stream()
133 .map(valueCollection -> valueCollection.values().size())
134 .collect(Collectors.summingInt(size -> size));
135 } finally {
136 commit.close();
137 }
138 }
139
140 /**
141 * Handles an IsEmpty commit.
142 *
143 * @param commit IsEmpty commit
144 * @return true if the multimap contains no key-value pairs, else false
145 */
146 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
147 try {
148 return backingMap.isEmpty();
149 } finally {
150 commit.close();
151 }
152 }
153
154 /**
155 * Handles a contains key commit.
156 *
157 * @param commit ContainsKey commit
158 * @return returns true if the key is in the multimap, else false
159 */
160 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
161 try {
162 return backingMap.containsKey(commit.operation().key());
163 } finally {
164 commit.close();
165 }
166 }
167
168 /**
169 * Handles a ContainsValue commit.
170 *
171 * @param commit ContainsValue commit
172 * @return true if the value is in the multimap, else false
173 */
174 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
175 try {
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700176 if (backingMap.values().isEmpty()) {
177 return false;
178 }
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700179 Match<byte[]> match = Match.ifValue(commit.operation().value());
180 return backingMap
181 .values()
182 .stream()
183 .anyMatch(valueList ->
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700184 valueList
185 .values()
186 .stream()
187 .anyMatch(byteValue ->
188 match.matches(byteValue)));
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700189 } finally {
190 commit.close();
191 }
192 }
193
194 /**
195 * Handles a ContainsEntry commit.
196 *
197 * @param commit ContainsEntry commit
198 * @return true if the key-value pair exists, else false
199 */
200 protected boolean containsEntry(Commit<? extends ContainsEntry> commit) {
201 try {
202 MapEntryValue entryValue =
203 backingMap.get(commit.operation().key());
204 if (entryValue == null) {
205 return false;
206 } else {
207 Match valueMatch = Match.ifValue(commit.operation().value());
208 return entryValue
209 .values()
210 .stream()
211 .anyMatch(byteValue -> valueMatch.matches(byteValue));
212 }
213 } finally {
214 commit.close();
215 }
216 }
217
218 /**
219 * Handles a Clear commit.
220 *
221 * @param commit Clear commit
222 */
223 protected void clear(Commit<? extends Clear> commit) {
224 try {
225 backingMap.clear();
226 } finally {
227 commit.close();
228 }
229 }
230
231 /**
232 * Handles a KeySet commit.
233 *
234 * @param commit KeySet commit
235 * @return a set of all keys in the multimap
236 */
237 protected Set<String> keySet(Commit<? extends KeySet> commit) {
238 try {
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700239 return ImmutableSet.copyOf(backingMap.keySet());
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700240 } finally {
241 commit.close();
242 }
243 }
244
245 /**
246 * Handles a Keys commit.
247 *
248 * @param commit Keys commit
249 * @return a multiset of keys with each key included an equal number of
250 * times to the total key-value pairs in which that key participates
251 */
252 protected Multiset<String> keys(Commit<? extends Keys> commit) {
253 try {
254 Multiset keys = HashMultiset.create();
255 backingMap.forEach((key, mapEntryValue) -> {
256 keys.add(key, mapEntryValue.values().size());
257 });
258 return keys;
259 } finally {
260 commit.close();
261 }
262 }
263
264 /**
265 * Handles a Values commit.
266 *
267 * @param commit Values commit
268 * @return the set of values in the multimap with duplicates included
269 */
270 protected Multiset<byte[]> values(Commit<? extends Values> commit) {
271 try {
272 return backingMap
273 .values()
274 .stream()
275 .collect(new HashMultisetValueCollector());
276 } finally {
277 commit.close();
278 }
279 }
280
281 /**
282 * Handles an Entries commit.
283 *
284 * @param commit Entries commit
285 * @return a set of all key-value pairs in the multimap
286 */
287 protected Collection<Map.Entry<String, byte[]>> entries(
288 Commit<? extends Entries> commit) {
289 try {
290 return backingMap
291 .entrySet()
292 .stream()
293 .collect(new EntrySetCollector());
294 } finally {
295 commit.close();
296 }
297 }
298
299 /**
300 * Handles a Get commit.
301 *
302 * @param commit Get commit
303 * @return the collection of values associated with the key or an empty
304 * list if none exist
305 */
306 protected Versioned<Collection<? extends byte[]>> get(
307 Commit<? extends Get> commit) {
308 try {
309 MapEntryValue mapEntryValue = backingMap.get(commit.operation().key());
310 return toVersioned(backingMap.get(commit.operation().key()));
311 } finally {
312 commit.close();
313 }
314 }
315
316 /**
317 * Handles a removeAll commit, and returns the previous mapping.
318 *
319 * @param commit removeAll commit
320 * @return collection of removed values
321 */
322 protected Versioned<Collection<? extends byte[]>> removeAll(
323 Commit<? extends RemoveAll> commit) {
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800324 String key = commit.operation().key();
325
326 if (!backingMap.containsKey(key)) {
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700327 commit.close();
328 return new Versioned<>(Sets.newHashSet(), -1);
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700329 }
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800330
331 Versioned<Collection<? extends byte[]>> removedValues =
332 backingMap.get(key).addCommit(commit);
333 publish(removedValues.value().stream()
334 .map(value -> new MultimapEvent<String, byte[]>(
335 "", key, null, value))
336 .collect(Collectors.toList()));
337 return removedValues;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700338 }
339
340 /**
341 * Handles a multiRemove commit, returns true if the remove results in any
342 * change.
343 * @param commit multiRemove commit
344 * @return true if any change results, else false
345 */
346 protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800347 String key = commit.operation().key();
348
349 if (!backingMap.containsKey(key)) {
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700350 commit.close();
351 return false;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700352 }
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800353
354 Versioned<Collection<? extends byte[]>> removedValues = backingMap
355 .get(key)
356 .addCommit(commit);
357
358 if (removedValues != null) {
359 publish(removedValues.value().stream()
360 .map(value -> new MultimapEvent<String, byte[]>(
361 "", key, null, value))
362 .collect(Collectors.toList()));
363 return true;
364 }
365
366 return false;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700367 }
368
369 /**
370 * Handles a put commit, returns true if any change results from this
371 * commit.
372 * @param commit a put commit
373 * @return true if this commit results in a change, else false
374 */
375 protected boolean put(Commit<? extends Put> commit) {
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800376 String key = commit.operation().key();
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700377 if (commit.operation().values().isEmpty()) {
378 return false;
379 }
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800380 if (!backingMap.containsKey(key)) {
381 backingMap.put(key, new NonTransactionalCommit(1));
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700382 }
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800383
384 Versioned<Collection<? extends byte[]>> addedValues = backingMap
385 .get(key)
386 .addCommit(commit);
387
388 if (addedValues != null) {
389 publish(addedValues.value().stream()
390 .map(value -> new MultimapEvent<String, byte[]>(
391 "", key, value, null))
392 .collect(Collectors.toList()));
393 return true;
394 }
395
396 return false;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700397 }
398
399 protected Versioned<Collection<? extends byte[]>> replace(
400 Commit<? extends Replace> commit) {
401 if (!backingMap.containsKey(commit.operation().key())) {
402 backingMap.put(commit.operation().key(),
403 new NonTransactionalCommit(1));
404 }
405 return backingMap.get(commit.operation().key()).addCommit(commit);
406 }
407
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800408 /**
409 * Handles a listen commit.
410 *
411 * @param commit listen commit
412 */
413 protected void listen(Commit<? extends Listen> commit) {
414 Long sessionId = commit.session().id();
415 if (listeners.putIfAbsent(sessionId, commit) != null) {
416 commit.close();
417 return;
418 }
419 commit.session()
420 .onStateChange(
421 state -> {
422 if (state == ServerSession.State.CLOSED
423 || state == ServerSession.State.EXPIRED) {
424 Commit<? extends Listen> listener = listeners.remove(sessionId);
425 if (listener != null) {
426 listener.close();
427 }
428 }
429 });
430 }
431
432 /**
433 * Handles an unlisten commit.
434 *
435 * @param commit unlisten commit
436 */
437 protected void unlisten(Commit<? extends Unlisten> commit) {
438 try {
439 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
440 if (listener != null) {
441 listener.close();
442 }
443 } finally {
444 commit.close();
445 }
446 }
447
448 /**
449 * Publishes events to listeners.
450 *
451 * @param events list of map event to publish
452 */
453 private void publish(List<MultimapEvent<String, byte[]>> events) {
454 listeners.values().forEach(commit ->
455 commit.session().publish(AtomixConsistentSetMultimap.CHANGE_SUBJECT, events));
456 }
457
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700458 private interface MapEntryValue {
459
460 /**
461 * Returns the list of raw {@code byte[]'s}.
462 *
463 * @return list of raw values
464 */
465 Collection<? extends byte[]> values();
466
467 /**
468 * Returns the version of the value.
469 *
470 * @return version
471 */
472 long version();
473
474 /**
475 * Discards the value by invoke appropriate clean up actions.
476 */
477 void discard();
478
479 /**
480 * Add a new commit and modifies the set of values accordingly.
481 * In the case of a replace or removeAll it returns the set of removed
482 * values. In the case of put or multiRemove it returns null for no
483 * change and a set of the added or removed values respectively if a
484 * change resulted.
485 *
486 * @param commit the commit to be added
487 */
488 Versioned<Collection<? extends byte[]>> addCommit(
489 Commit<? extends MultimapCommand> commit);
490 }
491
492 private class NonTransactionalCommit implements MapEntryValue {
493 private long version;
494 private final TreeMap<byte[], CountDownCompleter<Commit>>
495 valueCountdownMap = Maps.newTreeMap(new ByteArrayComparator());
496 /*This is a mapping of commits that added values to the commits
497 * removing those values, they will not be circular because keys will
498 * be exclusively Put and Replace commits and values will be exclusively
499 * Multiremove commits, each time a Put or replace is removed it should
500 * as part of closing go through and countdown each of the remove
501 * commits depending on it.*/
502 private final HashMultimap<Commit, CountDownCompleter<Commit>>
503 additiveToRemovalCommits = HashMultimap.create();
504
505 public NonTransactionalCommit(
506 long version) {
507 //Set the version to current it will only be updated once this is
508 // populated
509 this.version = globalVersion.get();
510 }
511
512 @Override
513 public Collection<? extends byte[]> values() {
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700514 return ImmutableSet.copyOf(valueCountdownMap.keySet());
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700515 }
516
517 @Override
518 public long version() {
519 return version;
520 }
521
522 @Override
523 public void discard() {
524 valueCountdownMap.values().forEach(completer ->
525 completer.object().close());
526 }
527
528 @Override
529 public Versioned<Collection<? extends byte[]>> addCommit(
530 Commit<? extends MultimapCommand> commit) {
531 Preconditions.checkNotNull(commit);
532 Preconditions.checkNotNull(commit.operation());
533 Versioned<Collection<? extends byte[]>> retVersion;
534
535 if (commit.operation() instanceof Put) {
536 //Using a treeset here sanitizes the input, removing duplicates
537 Set<byte[]> valuesToAdd =
538 Sets.newTreeSet(new ByteArrayComparator());
539 ((Put) commit.operation()).values().forEach(value -> {
540 if (!valueCountdownMap.containsKey(value)) {
541 valuesToAdd.add(value);
542 }
543 });
544 if (valuesToAdd.isEmpty()) {
545 //Do not increment or add the commit if no change resulted
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700546 commit.close();
547 return null;
548 }
549 //When all values from a commit have been removed decrement all
550 //removal commits relying on it and remove itself from the
551 //mapping of additive commits to the commits removing the
552 //values it added. (Only multiremoves will be dependent)
553 CountDownCompleter<Commit> completer =
554 new CountDownCompleter<>(commit, valuesToAdd.size(),
555 c -> {
556 if (additiveToRemovalCommits.containsKey(c)) {
557 additiveToRemovalCommits.
558 get(c).
559 forEach(countdown ->
560 countdown.countDown());
561 additiveToRemovalCommits.removeAll(c);
562 }
563 c.close();
564 });
565 retVersion = new Versioned<>(valuesToAdd, version);
566 valuesToAdd.forEach(value -> valueCountdownMap.put(value,
567 completer));
568 version++;
569 return retVersion;
570
571 } else if (commit.operation() instanceof Replace) {
572 //Will this work?? Need to check before check-in!
573 Set<byte[]> removedValues = Sets.newHashSet();
574 removedValues.addAll(valueCountdownMap.keySet());
575 retVersion = new Versioned<>(removedValues, version);
576 valueCountdownMap.values().forEach(countdown ->
577 countdown.countDown());
578 valueCountdownMap.clear();
579 Set<byte[]> valuesToAdd =
580 Sets.newTreeSet(new ByteArrayComparator());
581 ((Replace) commit.operation()).values().forEach(value -> {
582 valuesToAdd.add(value);
583 });
584 if (valuesToAdd.isEmpty()) {
585 version = globalVersion.incrementAndGet();
586 backingMap.remove(((Replace) commit.operation()).key());
587 //Order is important here, the commit must be closed last
588 //(or minimally after all uses)
589 commit.close();
590 return retVersion;
591 }
592 CountDownCompleter<Commit> completer =
593 new CountDownCompleter<>(commit, valuesToAdd.size(),
594 c -> {
595 if (additiveToRemovalCommits
596 .containsKey(c)) {
597 additiveToRemovalCommits.
598 get(c).
599 forEach(countdown ->
600 countdown.countDown());
601 additiveToRemovalCommits.
602 removeAll(c);
603 }
604 c.close();
605 });
606 valuesToAdd.forEach(value ->
607 valueCountdownMap.put(value, completer));
608 version = globalVersion.incrementAndGet();
609 return retVersion;
610
611 } else if (commit.operation() instanceof RemoveAll) {
612 Set<byte[]> removed = Sets.newHashSet();
613 //We can assume here that values only appear once and so we
614 //do not need to sanitize the return for duplicates.
615 removed.addAll(valueCountdownMap.keySet());
616 retVersion = new Versioned<>(removed, version);
617 valueCountdownMap.values().forEach(countdown ->
618 countdown.countDown());
619 valueCountdownMap.clear();
620 //In the case of a removeAll all commits will be removed and
621 //unlike the multiRemove case we do not need to consider
622 //dependencies among additive and removal commits.
623
624 //Save the key for use after the commit is closed
625 String key = ((RemoveAll) commit.operation()).key();
626 commit.close();
627 version = globalVersion.incrementAndGet();
628 backingMap.remove(key);
629 return retVersion;
630
631 } else if (commit.operation() instanceof MultiRemove) {
632 //Must first calculate how many commits the removal depends on.
633 //At this time we also sanitize the removal set by adding to a
634 //set with proper handling of byte[] equality.
635 Set<byte[]> removed = Sets.newHashSet();
636 Set<Commit> commitsRemovedFrom = Sets.newHashSet();
637 ((MultiRemove) commit.operation()).values().forEach(value -> {
638 if (valueCountdownMap.containsKey(value)) {
639 removed.add(value);
640 commitsRemovedFrom
641 .add(valueCountdownMap.get(value).object());
642 }
643 });
644 //If there is nothing to be removed no action should be taken.
645 if (removed.isEmpty()) {
646 //Do not increment or add the commit if no change resulted
647 commit.close();
648 return null;
649 }
650 //When all additive commits this depends on are closed this can
651 //be closed as well.
652 CountDownCompleter<Commit> completer =
653 new CountDownCompleter<>(commit,
654 commitsRemovedFrom.size(),
655 c -> c.close());
656 commitsRemovedFrom.forEach(commitRemovedFrom -> {
657 additiveToRemovalCommits.put(commitRemovedFrom, completer);
658 });
659 //Save key in case countdown results in closing the commit.
660 String removedKey = ((MultiRemove) commit.operation()).key();
661 removed.forEach(removedValue -> {
662 valueCountdownMap.remove(removedValue).countDown();
663 });
664 //The version is updated locally as well as globally even if
665 //this object will be removed from the map in case any other
666 //party still holds a reference to this object.
667 retVersion = new Versioned<>(removed, version);
668 version = globalVersion.incrementAndGet();
669 if (valueCountdownMap.isEmpty()) {
670 backingMap
671 .remove(removedKey);
672 }
673 return retVersion;
674
675 } else {
676 throw new IllegalArgumentException();
677 }
678 }
679 }
680
681 /**
682 * A collector that creates MapEntryValues and creates a multiset of all
683 * values in the map an equal number of times to the number of sets in
684 * which they participate.
685 */
686 private class HashMultisetValueCollector implements
687 Collector<MapEntryValue,
688 HashMultiset<byte[]>,
689 HashMultiset<byte[]>> {
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700690
691 @Override
692 public Supplier<HashMultiset<byte[]>> supplier() {
Jonathan Hartad0c3022017-02-22 14:06:01 -0800693 return HashMultiset::create;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700694 }
695
696 @Override
697 public BiConsumer<HashMultiset<byte[]>, MapEntryValue> accumulator() {
698 return (multiset, mapEntryValue) ->
699 multiset.addAll(mapEntryValue.values());
700 }
701
702 @Override
703 public BinaryOperator<HashMultiset<byte[]>> combiner() {
704 return (setOne, setTwo) -> {
705 setOne.addAll(setTwo);
706 return setOne;
707 };
708 }
709
710 @Override
711 public Function<HashMultiset<byte[]>,
712 HashMultiset<byte[]>> finisher() {
Jonathan Hartad0c3022017-02-22 14:06:01 -0800713 return Function.identity();
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700714 }
715
716 @Override
717 public Set<Characteristics> characteristics() {
718 return EnumSet.of(Characteristics.UNORDERED);
719 }
720 }
721
722 /**
723 * A collector that creates Entries of {@code <String, MapEntryValue>} and
724 * creates a set of entries all key value pairs in the map.
725 */
726 private class EntrySetCollector implements
727 Collector<Map.Entry<String, MapEntryValue>,
728 Set<Map.Entry<String, byte[]>>,
729 Set<Map.Entry<String, byte[]>>> {
730 private Set<Map.Entry<String, byte[]>> set = null;
731
732 @Override
733 public Supplier<Set<Map.Entry<String, byte[]>>> supplier() {
Sho SHIMIZU99e90cd2016-08-12 15:55:16 -0700734 return () -> {
735 if (set == null) {
736 set = Sets.newHashSet();
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700737 }
Sho SHIMIZU99e90cd2016-08-12 15:55:16 -0700738 return set;
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700739 };
740 }
741
742 @Override
743 public BiConsumer<Set<Map.Entry<String, byte[]>>,
744 Map.Entry<String, MapEntryValue>> accumulator() {
745 return (set, entry) -> {
746 entry
747 .getValue()
748 .values()
749 .forEach(byteValue ->
750 set.add(Maps.immutableEntry(entry.getKey(),
751 byteValue)));
752 };
753 }
754
755 @Override
756 public BinaryOperator<Set<Map.Entry<String, byte[]>>> combiner() {
757 return (setOne, setTwo) -> {
758 setOne.addAll(setTwo);
759 return setOne;
760 };
761 }
762
763 @Override
764 public Function<Set<Map.Entry<String, byte[]>>,
765 Set<Map.Entry<String, byte[]>>> finisher() {
766 return (unused) -> set;
767 }
768
769 @Override
770 public Set<Characteristics> characteristics() {
771 return EnumSet.of(Characteristics.UNORDERED);
772 }
773 }
774 /**
775 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
776 * @param value map entry value
777 * @return versioned instance or an empty list versioned -1 if argument is
778 * null
779 */
780 private Versioned<Collection<? extends byte[]>> toVersioned(
781 MapEntryValue value) {
782 return value == null ? new Versioned<>(Lists.newArrayList(), -1) :
783 new Versioned<>(value.values(),
784 value.version());
785 }
786
787 private class ByteArrayComparator implements Comparator<byte[]> {
788
789 @Override
790 public int compare(byte[] o1, byte[] o2) {
791 if (Arrays.equals(o1, o2)) {
792 return 0;
793 } else {
794 for (int i = 0; i < o1.length && i < o2.length; i++) {
795 if (o1[i] < o2[i]) {
796 return -1;
797 } else if (o1[i] > o2[i]) {
798 return 1;
799 }
800 }
801 return o1.length > o2.length ? 1 : -1;
802 }
803 }
804 }
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700805 }