blob: 878aac24bc046be9351c4e743670e20b70f14f3e [file] [log] [blame]
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -07001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import com.google.common.base.Preconditions;
20import com.google.common.collect.HashMultimap;
21import com.google.common.collect.HashMultiset;
22import com.google.common.collect.Lists;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Multiset;
25import com.google.common.collect.Sets;
26import io.atomix.copycat.server.Commit;
27import io.atomix.copycat.server.Snapshottable;
28import io.atomix.copycat.server.StateMachineExecutor;
29import io.atomix.copycat.server.session.ServerSession;
30import io.atomix.copycat.server.session.SessionListener;
31import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
32import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
33import io.atomix.resource.ResourceStateMachine;
34import org.onlab.util.CountDownCompleter;
35import org.onlab.util.Match;
36import org.onosproject.store.service.Versioned;
37import org.slf4j.Logger;
38
39import java.util.Arrays;
40import java.util.Collection;
41import java.util.Comparator;
42import java.util.EnumSet;
43import java.util.Map;
44import java.util.Properties;
45import java.util.Set;
46import java.util.TreeMap;
47import java.util.concurrent.atomic.AtomicLong;
48import java.util.function.BiConsumer;
49import java.util.function.BinaryOperator;
50import java.util.function.Function;
51import java.util.function.Supplier;
52import java.util.stream.Collector;
53import java.util.stream.Collectors;
54
55import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Clear;
56import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsEntry;
57import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsKey;
58import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsValue;
59import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Entries;
60import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Get;
61import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.IsEmpty;
62import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.KeySet;
63import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Keys;
64import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.MultiRemove;
65import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.MultimapCommand;
66import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Put;
67import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.RemoveAll;
68import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Replace;
69import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Size;
70import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Values;
71import static org.slf4j.LoggerFactory.getLogger;
72
73/**
74 * State Machine for {@link AsyncConsistentSetMultimap} resource.
75 */
76public class AsyncConsistentSetMultimapState extends ResourceStateMachine
77 implements SessionListener, Snapshottable {
78
79 private final Logger log = getLogger(getClass());
80 private final AtomicLong globalVersion = new AtomicLong(1);
81 //TODO Add listener map here
82 private final Map<String, MapEntryValue> backingMap = Maps.newHashMap();
83
84 public AsyncConsistentSetMultimapState(Properties properties) {
85 super(properties);
86 }
87
88 @Override
89 public void snapshot(SnapshotWriter writer) {
90 }
91
92 @Override
93 public void install(SnapshotReader reader) {
94 }
95
96 @Override
97 protected void configure(StateMachineExecutor executor) {
98 executor.register(Size.class, this::size);
99 executor.register(IsEmpty.class, this::isEmpty);
100 executor.register(ContainsKey.class, this::containsKey);
101 executor.register(ContainsValue.class, this::containsValue);
102 executor.register(ContainsEntry.class, this::containsEntry);
103 executor.register(Clear.class, this::clear);
104 executor.register(KeySet.class, this::keySet);
105 executor.register(Keys.class, this::keys);
106 executor.register(Values.class, this::values);
107 executor.register(Entries.class, this::entries);
108 executor.register(Get.class, this::get);
109 executor.register(RemoveAll.class, this::removeAll);
110 executor.register(MultiRemove.class, this::multiRemove);
111 executor.register(Put.class, this::put);
112 executor.register(Replace.class, this::replace);
113 }
114
115 @Override
116 public void delete() {
117 super.delete();
118 }
119
120 /**
121 * Handles a Size commit.
122 *
123 * @param commit Size commit
124 * @return number of unique key value pairs in the multimap
125 */
126 protected int size(Commit<? extends Size> commit) {
127 try {
128 return backingMap.values()
129 .stream()
130 .map(valueCollection -> valueCollection.values().size())
131 .collect(Collectors.summingInt(size -> size));
132 } finally {
133 commit.close();
134 }
135 }
136
137 /**
138 * Handles an IsEmpty commit.
139 *
140 * @param commit IsEmpty commit
141 * @return true if the multimap contains no key-value pairs, else false
142 */
143 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
144 try {
145 return backingMap.isEmpty();
146 } finally {
147 commit.close();
148 }
149 }
150
151 /**
152 * Handles a contains key commit.
153 *
154 * @param commit ContainsKey commit
155 * @return returns true if the key is in the multimap, else false
156 */
157 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
158 try {
159 return backingMap.containsKey(commit.operation().key());
160 } finally {
161 commit.close();
162 }
163 }
164
165 /**
166 * Handles a ContainsValue commit.
167 *
168 * @param commit ContainsValue commit
169 * @return true if the value is in the multimap, else false
170 */
171 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
172 try {
173 Match<byte[]> match = Match.ifValue(commit.operation().value());
174 return backingMap
175 .values()
176 .stream()
177 .anyMatch(valueList ->
178 valueList
179 .values()
180 .stream()
181 .anyMatch(byteValue ->
182 match.matches(byteValue)));
183 } finally {
184 commit.close();
185 }
186 }
187
188 /**
189 * Handles a ContainsEntry commit.
190 *
191 * @param commit ContainsEntry commit
192 * @return true if the key-value pair exists, else false
193 */
194 protected boolean containsEntry(Commit<? extends ContainsEntry> commit) {
195 try {
196 MapEntryValue entryValue =
197 backingMap.get(commit.operation().key());
198 if (entryValue == null) {
199 return false;
200 } else {
201 Match valueMatch = Match.ifValue(commit.operation().value());
202 return entryValue
203 .values()
204 .stream()
205 .anyMatch(byteValue -> valueMatch.matches(byteValue));
206 }
207 } finally {
208 commit.close();
209 }
210 }
211
212 /**
213 * Handles a Clear commit.
214 *
215 * @param commit Clear commit
216 */
217 protected void clear(Commit<? extends Clear> commit) {
218 try {
219 backingMap.clear();
220 } finally {
221 commit.close();
222 }
223 }
224
225 /**
226 * Handles a KeySet commit.
227 *
228 * @param commit KeySet commit
229 * @return a set of all keys in the multimap
230 */
231 protected Set<String> keySet(Commit<? extends KeySet> commit) {
232 try {
233 return backingMap.keySet();
234 } finally {
235 commit.close();
236 }
237 }
238
239 /**
240 * Handles a Keys commit.
241 *
242 * @param commit Keys commit
243 * @return a multiset of keys with each key included an equal number of
244 * times to the total key-value pairs in which that key participates
245 */
246 protected Multiset<String> keys(Commit<? extends Keys> commit) {
247 try {
248 Multiset keys = HashMultiset.create();
249 backingMap.forEach((key, mapEntryValue) -> {
250 keys.add(key, mapEntryValue.values().size());
251 });
252 return keys;
253 } finally {
254 commit.close();
255 }
256 }
257
258 /**
259 * Handles a Values commit.
260 *
261 * @param commit Values commit
262 * @return the set of values in the multimap with duplicates included
263 */
264 protected Multiset<byte[]> values(Commit<? extends Values> commit) {
265 try {
266 return backingMap
267 .values()
268 .stream()
269 .collect(new HashMultisetValueCollector());
270 } finally {
271 commit.close();
272 }
273 }
274
275 /**
276 * Handles an Entries commit.
277 *
278 * @param commit Entries commit
279 * @return a set of all key-value pairs in the multimap
280 */
281 protected Collection<Map.Entry<String, byte[]>> entries(
282 Commit<? extends Entries> commit) {
283 try {
284 return backingMap
285 .entrySet()
286 .stream()
287 .collect(new EntrySetCollector());
288 } finally {
289 commit.close();
290 }
291 }
292
293 /**
294 * Handles a Get commit.
295 *
296 * @param commit Get commit
297 * @return the collection of values associated with the key or an empty
298 * list if none exist
299 */
300 protected Versioned<Collection<? extends byte[]>> get(
301 Commit<? extends Get> commit) {
302 try {
303 MapEntryValue mapEntryValue = backingMap.get(commit.operation().key());
304 return toVersioned(backingMap.get(commit.operation().key()));
305 } finally {
306 commit.close();
307 }
308 }
309
310 /**
311 * Handles a removeAll commit, and returns the previous mapping.
312 *
313 * @param commit removeAll commit
314 * @return collection of removed values
315 */
316 protected Versioned<Collection<? extends byte[]>> removeAll(
317 Commit<? extends RemoveAll> commit) {
318 if (!backingMap.containsKey(commit.operation().key())) {
319 commit.close();
320 return new Versioned<>(Sets.newHashSet(), -1);
321 } else {
322 return backingMap.get(commit.operation().key()).addCommit(commit);
323 }
324 }
325
326 /**
327 * Handles a multiRemove commit, returns true if the remove results in any
328 * change.
329 * @param commit multiRemove commit
330 * @return true if any change results, else false
331 */
332 protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
333 if (!backingMap.containsKey(commit.operation().key())) {
334 commit.close();
335 return false;
336 } else {
337 return (backingMap
338 .get(commit.operation().key())
339 .addCommit(commit)) != null;
340 }
341 }
342
343 /**
344 * Handles a put commit, returns true if any change results from this
345 * commit.
346 * @param commit a put commit
347 * @return true if this commit results in a change, else false
348 */
349 protected boolean put(Commit<? extends Put> commit) {
350 if (commit.operation().values().isEmpty()) {
351 return false;
352 }
353 if (!backingMap.containsKey(commit.operation().key())) {
354 backingMap.put(commit.operation().key(),
355 new NonTransactionalCommit(1));
356 }
357 return backingMap
358 .get(commit.operation().key())
359 .addCommit(commit) != null;
360 }
361
362 protected Versioned<Collection<? extends byte[]>> replace(
363 Commit<? extends Replace> commit) {
364 if (!backingMap.containsKey(commit.operation().key())) {
365 backingMap.put(commit.operation().key(),
366 new NonTransactionalCommit(1));
367 }
368 return backingMap.get(commit.operation().key()).addCommit(commit);
369 }
370
371 @Override
372 public void register(ServerSession session) {
373 super.register(session);
374 }
375
376 @Override
377 public void unregister(ServerSession session) {
378 super.unregister(session);
379 }
380
381 @Override
382 public void expire(ServerSession session) {
383 super.expire(session);
384 }
385
386 @Override
387 public void close(ServerSession session) {
388 super.close(session);
389 }
390
391 private interface MapEntryValue {
392
393 /**
394 * Returns the list of raw {@code byte[]'s}.
395 *
396 * @return list of raw values
397 */
398 Collection<? extends byte[]> values();
399
400 /**
401 * Returns the version of the value.
402 *
403 * @return version
404 */
405 long version();
406
407 /**
408 * Discards the value by invoke appropriate clean up actions.
409 */
410 void discard();
411
412 /**
413 * Add a new commit and modifies the set of values accordingly.
414 * In the case of a replace or removeAll it returns the set of removed
415 * values. In the case of put or multiRemove it returns null for no
416 * change and a set of the added or removed values respectively if a
417 * change resulted.
418 *
419 * @param commit the commit to be added
420 */
421 Versioned<Collection<? extends byte[]>> addCommit(
422 Commit<? extends MultimapCommand> commit);
423 }
424
425 private class NonTransactionalCommit implements MapEntryValue {
426 private long version;
427 private final TreeMap<byte[], CountDownCompleter<Commit>>
428 valueCountdownMap = Maps.newTreeMap(new ByteArrayComparator());
429 /*This is a mapping of commits that added values to the commits
430 * removing those values, they will not be circular because keys will
431 * be exclusively Put and Replace commits and values will be exclusively
432 * Multiremove commits, each time a Put or replace is removed it should
433 * as part of closing go through and countdown each of the remove
434 * commits depending on it.*/
435 private final HashMultimap<Commit, CountDownCompleter<Commit>>
436 additiveToRemovalCommits = HashMultimap.create();
437
438 public NonTransactionalCommit(
439 long version) {
440 //Set the version to current it will only be updated once this is
441 // populated
442 this.version = globalVersion.get();
443 }
444
445 @Override
446 public Collection<? extends byte[]> values() {
447 return valueCountdownMap.keySet();
448 }
449
450 @Override
451 public long version() {
452 return version;
453 }
454
455 @Override
456 public void discard() {
457 valueCountdownMap.values().forEach(completer ->
458 completer.object().close());
459 }
460
461 @Override
462 public Versioned<Collection<? extends byte[]>> addCommit(
463 Commit<? extends MultimapCommand> commit) {
464 Preconditions.checkNotNull(commit);
465 Preconditions.checkNotNull(commit.operation());
466 Versioned<Collection<? extends byte[]>> retVersion;
467
468 if (commit.operation() instanceof Put) {
469 //Using a treeset here sanitizes the input, removing duplicates
470 Set<byte[]> valuesToAdd =
471 Sets.newTreeSet(new ByteArrayComparator());
472 ((Put) commit.operation()).values().forEach(value -> {
473 if (!valueCountdownMap.containsKey(value)) {
474 valuesToAdd.add(value);
475 }
476 });
477 if (valuesToAdd.isEmpty()) {
478 //Do not increment or add the commit if no change resulted
479// TODO fairly sure the below case is unreachable but
480// TODO need to make sure
481// if (valueCountdownMap.isEmpty()) {
482// backingMap.remove(((Put) commit.operation()).key());
483// }
484 commit.close();
485 return null;
486 }
487 //When all values from a commit have been removed decrement all
488 //removal commits relying on it and remove itself from the
489 //mapping of additive commits to the commits removing the
490 //values it added. (Only multiremoves will be dependent)
491 CountDownCompleter<Commit> completer =
492 new CountDownCompleter<>(commit, valuesToAdd.size(),
493 c -> {
494 if (additiveToRemovalCommits.containsKey(c)) {
495 additiveToRemovalCommits.
496 get(c).
497 forEach(countdown ->
498 countdown.countDown());
499 additiveToRemovalCommits.removeAll(c);
500 }
501 c.close();
502 });
503 retVersion = new Versioned<>(valuesToAdd, version);
504 valuesToAdd.forEach(value -> valueCountdownMap.put(value,
505 completer));
506 version++;
507 return retVersion;
508
509 } else if (commit.operation() instanceof Replace) {
510 //Will this work?? Need to check before check-in!
511 Set<byte[]> removedValues = Sets.newHashSet();
512 removedValues.addAll(valueCountdownMap.keySet());
513 retVersion = new Versioned<>(removedValues, version);
514 valueCountdownMap.values().forEach(countdown ->
515 countdown.countDown());
516 valueCountdownMap.clear();
517 Set<byte[]> valuesToAdd =
518 Sets.newTreeSet(new ByteArrayComparator());
519 ((Replace) commit.operation()).values().forEach(value -> {
520 valuesToAdd.add(value);
521 });
522 if (valuesToAdd.isEmpty()) {
523 version = globalVersion.incrementAndGet();
524 backingMap.remove(((Replace) commit.operation()).key());
525 //Order is important here, the commit must be closed last
526 //(or minimally after all uses)
527 commit.close();
528 return retVersion;
529 }
530 CountDownCompleter<Commit> completer =
531 new CountDownCompleter<>(commit, valuesToAdd.size(),
532 c -> {
533 if (additiveToRemovalCommits
534 .containsKey(c)) {
535 additiveToRemovalCommits.
536 get(c).
537 forEach(countdown ->
538 countdown.countDown());
539 additiveToRemovalCommits.
540 removeAll(c);
541 }
542 c.close();
543 });
544 valuesToAdd.forEach(value ->
545 valueCountdownMap.put(value, completer));
546 version = globalVersion.incrementAndGet();
547 return retVersion;
548
549 } else if (commit.operation() instanceof RemoveAll) {
550 Set<byte[]> removed = Sets.newHashSet();
551 //We can assume here that values only appear once and so we
552 //do not need to sanitize the return for duplicates.
553 removed.addAll(valueCountdownMap.keySet());
554 retVersion = new Versioned<>(removed, version);
555 valueCountdownMap.values().forEach(countdown ->
556 countdown.countDown());
557 valueCountdownMap.clear();
558 //In the case of a removeAll all commits will be removed and
559 //unlike the multiRemove case we do not need to consider
560 //dependencies among additive and removal commits.
561
562 //Save the key for use after the commit is closed
563 String key = ((RemoveAll) commit.operation()).key();
564 commit.close();
565 version = globalVersion.incrementAndGet();
566 backingMap.remove(key);
567 return retVersion;
568
569 } else if (commit.operation() instanceof MultiRemove) {
570 //Must first calculate how many commits the removal depends on.
571 //At this time we also sanitize the removal set by adding to a
572 //set with proper handling of byte[] equality.
573 Set<byte[]> removed = Sets.newHashSet();
574 Set<Commit> commitsRemovedFrom = Sets.newHashSet();
575 ((MultiRemove) commit.operation()).values().forEach(value -> {
576 if (valueCountdownMap.containsKey(value)) {
577 removed.add(value);
578 commitsRemovedFrom
579 .add(valueCountdownMap.get(value).object());
580 }
581 });
582 //If there is nothing to be removed no action should be taken.
583 if (removed.isEmpty()) {
584 //Do not increment or add the commit if no change resulted
585 commit.close();
586 return null;
587 }
588 //When all additive commits this depends on are closed this can
589 //be closed as well.
590 CountDownCompleter<Commit> completer =
591 new CountDownCompleter<>(commit,
592 commitsRemovedFrom.size(),
593 c -> c.close());
594 commitsRemovedFrom.forEach(commitRemovedFrom -> {
595 additiveToRemovalCommits.put(commitRemovedFrom, completer);
596 });
597 //Save key in case countdown results in closing the commit.
598 String removedKey = ((MultiRemove) commit.operation()).key();
599 removed.forEach(removedValue -> {
600 valueCountdownMap.remove(removedValue).countDown();
601 });
602 //The version is updated locally as well as globally even if
603 //this object will be removed from the map in case any other
604 //party still holds a reference to this object.
605 retVersion = new Versioned<>(removed, version);
606 version = globalVersion.incrementAndGet();
607 if (valueCountdownMap.isEmpty()) {
608 backingMap
609 .remove(removedKey);
610 }
611 return retVersion;
612
613 } else {
614 throw new IllegalArgumentException();
615 }
616 }
617 }
618
619 /**
620 * A collector that creates MapEntryValues and creates a multiset of all
621 * values in the map an equal number of times to the number of sets in
622 * which they participate.
623 */
624 private class HashMultisetValueCollector implements
625 Collector<MapEntryValue,
626 HashMultiset<byte[]>,
627 HashMultiset<byte[]>> {
628 private HashMultiset<byte[]> multiset = null;
629
630 @Override
631 public Supplier<HashMultiset<byte[]>> supplier() {
632 return new Supplier<HashMultiset<byte[]>>() {
633 @Override
634 public HashMultiset<byte[]> get() {
635 if (multiset == null) {
636 multiset = HashMultiset.create();
637 }
638 return multiset;
639 }
640 };
641 }
642
643 @Override
644 public BiConsumer<HashMultiset<byte[]>, MapEntryValue> accumulator() {
645 return (multiset, mapEntryValue) ->
646 multiset.addAll(mapEntryValue.values());
647 }
648
649 @Override
650 public BinaryOperator<HashMultiset<byte[]>> combiner() {
651 return (setOne, setTwo) -> {
652 setOne.addAll(setTwo);
653 return setOne;
654 };
655 }
656
657 @Override
658 public Function<HashMultiset<byte[]>,
659 HashMultiset<byte[]>> finisher() {
660 return (unused) -> multiset;
661 }
662
663 @Override
664 public Set<Characteristics> characteristics() {
665 return EnumSet.of(Characteristics.UNORDERED);
666 }
667 }
668
669 /**
670 * A collector that creates Entries of {@code <String, MapEntryValue>} and
671 * creates a set of entries all key value pairs in the map.
672 */
673 private class EntrySetCollector implements
674 Collector<Map.Entry<String, MapEntryValue>,
675 Set<Map.Entry<String, byte[]>>,
676 Set<Map.Entry<String, byte[]>>> {
677 private Set<Map.Entry<String, byte[]>> set = null;
678
679 @Override
680 public Supplier<Set<Map.Entry<String, byte[]>>> supplier() {
681 return new Supplier<Set<Map.Entry<String, byte[]>>>() {
682 @Override
683 public Set<Map.Entry<String, byte[]>> get() {
684 if (set == null) {
685 set = Sets.newHashSet();
686 }
687 return set;
688 }
689 };
690 }
691
692 @Override
693 public BiConsumer<Set<Map.Entry<String, byte[]>>,
694 Map.Entry<String, MapEntryValue>> accumulator() {
695 return (set, entry) -> {
696 entry
697 .getValue()
698 .values()
699 .forEach(byteValue ->
700 set.add(Maps.immutableEntry(entry.getKey(),
701 byteValue)));
702 };
703 }
704
705 @Override
706 public BinaryOperator<Set<Map.Entry<String, byte[]>>> combiner() {
707 return (setOne, setTwo) -> {
708 setOne.addAll(setTwo);
709 return setOne;
710 };
711 }
712
713 @Override
714 public Function<Set<Map.Entry<String, byte[]>>,
715 Set<Map.Entry<String, byte[]>>> finisher() {
716 return (unused) -> set;
717 }
718
719 @Override
720 public Set<Characteristics> characteristics() {
721 return EnumSet.of(Characteristics.UNORDERED);
722 }
723 }
724 /**
725 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
726 * @param value map entry value
727 * @return versioned instance or an empty list versioned -1 if argument is
728 * null
729 */
730 private Versioned<Collection<? extends byte[]>> toVersioned(
731 MapEntryValue value) {
732 return value == null ? new Versioned<>(Lists.newArrayList(), -1) :
733 new Versioned<>(value.values(),
734 value.version());
735 }
736
737 private class ByteArrayComparator implements Comparator<byte[]> {
738
739 @Override
740 public int compare(byte[] o1, byte[] o2) {
741 if (Arrays.equals(o1, o2)) {
742 return 0;
743 } else {
744 for (int i = 0; i < o1.length && i < o2.length; i++) {
745 if (o1[i] < o2[i]) {
746 return -1;
747 } else if (o1[i] > o2[i]) {
748 return 1;
749 }
750 }
751 return o1.length > o2.length ? 1 : -1;
752 }
753 }
754 }
755}