blob: 6c864b016f97f3326c4ea38f44a6b7d2553684fe [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
Jordan Halterman15f33712018-06-21 00:00:15 -070019import java.util.ArrayList;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070020import java.util.Arrays;
21import java.util.Collection;
22import java.util.Comparator;
23import java.util.EnumSet;
24import java.util.HashMap;
Jordan Halterman15f33712018-06-21 00:00:15 -070025import java.util.Iterator;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070026import java.util.LinkedHashMap;
27import java.util.List;
28import java.util.Map;
29import java.util.Set;
30import java.util.TreeSet;
31import java.util.concurrent.atomic.AtomicLong;
32import java.util.function.BiConsumer;
33import java.util.function.BinaryOperator;
34import java.util.function.Function;
35import java.util.function.Supplier;
36import java.util.stream.Collector;
37import java.util.stream.Collectors;
38
39import com.esotericsoftware.kryo.Kryo;
40import com.esotericsoftware.kryo.io.Input;
41import com.esotericsoftware.kryo.io.Output;
42import com.google.common.base.Preconditions;
43import com.google.common.collect.HashMultiset;
44import com.google.common.collect.ImmutableSet;
45import com.google.common.collect.Lists;
46import com.google.common.collect.Maps;
47import com.google.common.collect.Multiset;
48import com.google.common.collect.Sets;
49import io.atomix.protocols.raft.service.AbstractRaftService;
50import io.atomix.protocols.raft.service.Commit;
51import io.atomix.protocols.raft.service.RaftServiceExecutor;
52import io.atomix.protocols.raft.session.RaftSession;
53import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
54import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
55import org.onlab.util.KryoNamespace;
56import org.onlab.util.Match;
57import org.onosproject.store.serializers.KryoNamespaces;
58import org.onosproject.store.service.MultimapEvent;
59import org.onosproject.store.service.Serializer;
60import org.onosproject.store.service.Versioned;
61
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
Jordan Halterman15f33712018-06-21 00:00:15 -070065import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070066import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsEntry;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsKey;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsValue;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ENTRIES;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
Jordan Halterman15f33712018-06-21 00:00:15 -070076import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070078import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
79import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
80import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
81import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
Jordan Halterman15f33712018-06-21 00:00:15 -070082import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
83import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070084import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
Jordan Halterman8c57a092018-06-04 14:53:06 -070085import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070086import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
87import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
88import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
Jordan Halterman8c57a092018-06-04 14:53:06 -070089import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070090import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
91import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
92import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
93import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Replace;
94import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.SIZE;
95import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.VALUES;
96
97/**
98 * State Machine for {@link AtomixConsistentSetMultimap} resource.
99 */
100public class AtomixConsistentSetMultimapService extends AbstractRaftService {
Jordan Halterman15f33712018-06-21 00:00:15 -0700101 private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700102
103 private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
104 .register(KryoNamespaces.BASIC)
105 .register(AtomixConsistentSetMultimapOperations.NAMESPACE)
106 .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
107 .register(ByteArrayComparator.class)
108 .register(new HashMap().keySet().getClass())
109 .register(TreeSet.class)
110 .register(new com.esotericsoftware.kryo.Serializer<NonTransactionalCommit>() {
111 @Override
112 public void write(Kryo kryo, Output output, NonTransactionalCommit object) {
113 kryo.writeClassAndObject(output, object.valueSet);
114 }
115
116 @Override
117 @SuppressWarnings("unchecked")
118 public NonTransactionalCommit read(Kryo kryo, Input input, Class<NonTransactionalCommit> type) {
119 NonTransactionalCommit commit = new NonTransactionalCommit();
120 commit.valueSet.addAll((Collection<byte[]>) kryo.readClassAndObject(input));
121 return commit;
122 }
123 }, NonTransactionalCommit.class)
124 .build());
125
126 private AtomicLong globalVersion = new AtomicLong(1);
127 private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
Jordan Halterman15f33712018-06-21 00:00:15 -0700128 private Map<String, MapEntryValue> backingMap = Maps.newConcurrentMap();
129 private Map<Long, IteratorContext> iterators = Maps.newHashMap();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700130
131 @Override
132 public void snapshot(SnapshotWriter writer) {
133 writer.writeLong(globalVersion.get());
134 writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer::encode);
135 writer.writeObject(backingMap, serializer::encode);
Jordan Halterman15f33712018-06-21 00:00:15 -0700136
137 Map<Long, Long> iterators = Maps.newHashMap();
138 this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
139 writer.writeObject(iterators, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700140 }
141
142 @Override
143 public void install(SnapshotReader reader) {
144 globalVersion = new AtomicLong(reader.readLong());
145
146 listeners = new LinkedHashMap<>();
147 for (Long sessionId : reader.<Set<Long>>readObject(serializer::decode)) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700148 listeners.put(sessionId, sessions().getSession(sessionId));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700149 }
150
151 backingMap = reader.readObject(serializer::decode);
Jordan Halterman15f33712018-06-21 00:00:15 -0700152
153 Map<Long, Long> iterators = reader.readObject(serializer::decode);
154 this.iterators = Maps.newHashMap();
155 iterators.forEach((id, session) ->
156 this.iterators.put(id, new IteratorContext(session, backingMap.entrySet().iterator())));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700157 }
158
159 @Override
160 protected void configure(RaftServiceExecutor executor) {
161 executor.register(SIZE, this::size, serializer::encode);
162 executor.register(IS_EMPTY, this::isEmpty, serializer::encode);
163 executor.register(CONTAINS_KEY, serializer::decode, this::containsKey, serializer::encode);
164 executor.register(CONTAINS_VALUE, serializer::decode, this::containsValue, serializer::encode);
165 executor.register(CONTAINS_ENTRY, serializer::decode, this::containsEntry, serializer::encode);
166 executor.register(CLEAR, this::clear);
167 executor.register(KEY_SET, this::keySet, serializer::encode);
168 executor.register(KEYS, this::keys, serializer::encode);
169 executor.register(VALUES, this::values, serializer::encode);
170 executor.register(ENTRIES, this::entries, serializer::encode);
171 executor.register(GET, serializer::decode, this::get, serializer::encode);
172 executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
173 executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
Jordan Halterman8c57a092018-06-04 14:53:06 -0700174 executor.register(REMOVE_AND_GET, serializer::decode, this::removeAndGet, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700175 executor.register(PUT, serializer::decode, this::put, serializer::encode);
Jordan Halterman8c57a092018-06-04 14:53:06 -0700176 executor.register(PUT_AND_GET, serializer::decode, this::putAndGet, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700177 executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
178 executor.register(ADD_LISTENER, this::listen);
179 executor.register(REMOVE_LISTENER, this::unlisten);
Jordan Halterman15f33712018-06-21 00:00:15 -0700180 executor.register(OPEN_ITERATOR, this::openIterator, serializer::encode);
181 executor.register(NEXT, serializer::decode, this::next, serializer::encode);
182 executor.register(CLOSE_ITERATOR, serializer::decode, this::closeIterator);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700183 }
184
185 @Override
186 public void onExpire(RaftSession session) {
187 listeners.remove(session.sessionId().id());
Jordan Halterman15f33712018-06-21 00:00:15 -0700188 iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700189 }
190
191 @Override
192 public void onClose(RaftSession session) {
193 listeners.remove(session.sessionId().id());
Jordan Halterman15f33712018-06-21 00:00:15 -0700194 iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700195 }
196
197 /**
198 * Handles a Size commit.
199 *
200 * @param commit Size commit
201 * @return number of unique key value pairs in the multimap
202 */
203 protected int size(Commit<Void> commit) {
204 return backingMap.values()
205 .stream()
206 .map(valueCollection -> valueCollection.values().size())
207 .collect(Collectors.summingInt(size -> size));
208 }
209
210 /**
211 * Handles an IsEmpty commit.
212 *
213 * @param commit IsEmpty commit
214 * @return true if the multimap contains no key-value pairs, else false
215 */
216 protected boolean isEmpty(Commit<Void> commit) {
217 return backingMap.isEmpty();
218 }
219
220 /**
221 * Handles a contains key commit.
222 *
223 * @param commit ContainsKey commit
224 * @return returns true if the key is in the multimap, else false
225 */
226 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
227 return backingMap.containsKey(commit.value().key());
228 }
229
230 /**
231 * Handles a ContainsValue commit.
232 *
233 * @param commit ContainsValue commit
234 * @return true if the value is in the multimap, else false
235 */
236 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
237 if (backingMap.values().isEmpty()) {
238 return false;
239 }
240 Match<byte[]> match = Match.ifValue(commit.value().value());
241 return backingMap
242 .values()
243 .stream()
244 .anyMatch(valueList ->
245 valueList
246 .values()
247 .stream()
248 .anyMatch(byteValue ->
249 match.matches(byteValue)));
250 }
251
252 /**
253 * Handles a ContainsEntry commit.
254 *
255 * @param commit ContainsEntry commit
256 * @return true if the key-value pair exists, else false
257 */
258 protected boolean containsEntry(Commit<? extends ContainsEntry> commit) {
259 MapEntryValue entryValue =
260 backingMap.get(commit.value().key());
261 if (entryValue == null) {
262 return false;
263 } else {
264 Match valueMatch = Match.ifValue(commit.value().value());
265 return entryValue
266 .values()
267 .stream()
268 .anyMatch(byteValue -> valueMatch.matches(byteValue));
269 }
270 }
271
272 /**
273 * Handles a Clear commit.
274 *
275 * @param commit Clear commit
276 */
277 protected void clear(Commit<Void> commit) {
278 backingMap.clear();
279 }
280
281 /**
282 * Handles a KeySet commit.
283 *
284 * @param commit KeySet commit
285 * @return a set of all keys in the multimap
286 */
287 protected Set<String> keySet(Commit<Void> commit) {
288 return ImmutableSet.copyOf(backingMap.keySet());
289 }
290
291 /**
292 * Handles a Keys commit.
293 *
294 * @param commit Keys commit
295 * @return a multiset of keys with each key included an equal number of
296 * times to the total key-value pairs in which that key participates
297 */
298 protected Multiset<String> keys(Commit<Void> commit) {
299 Multiset keys = HashMultiset.create();
300 backingMap.forEach((key, mapEntryValue) -> {
301 keys.add(key, mapEntryValue.values().size());
302 });
303 return keys;
304 }
305
306 /**
307 * Handles a Values commit.
308 *
309 * @param commit Values commit
310 * @return the set of values in the multimap with duplicates included
311 */
312 protected Multiset<byte[]> values(Commit<Void> commit) {
313 return backingMap
314 .values()
315 .stream()
316 .collect(new HashMultisetValueCollector());
317 }
318
319 /**
320 * Handles an Entries commit.
321 *
322 * @param commit Entries commit
323 * @return a set of all key-value pairs in the multimap
324 */
325 protected Collection<Map.Entry<String, byte[]>> entries(Commit<Void> commit) {
326 return backingMap
327 .entrySet()
328 .stream()
329 .collect(new EntrySetCollector());
330 }
331
332 /**
333 * Handles a Get commit.
334 *
335 * @param commit Get commit
336 * @return the collection of values associated with the key or an empty
337 * list if none exist
338 */
339 protected Versioned<Collection<? extends byte[]>> get(Commit<? extends Get> commit) {
340 return toVersioned(backingMap.get(commit.value().key()));
341 }
342
343 /**
344 * Handles a removeAll commit, and returns the previous mapping.
345 *
346 * @param commit removeAll commit
347 * @return collection of removed values
348 */
349 protected Versioned<Collection<? extends byte[]>> removeAll(Commit<? extends RemoveAll> commit) {
350 String key = commit.value().key();
351
352 if (!backingMap.containsKey(key)) {
353 return new Versioned<>(Sets.newHashSet(), -1);
354 }
355
356 Versioned<Collection<? extends byte[]>> removedValues =
357 backingMap.get(key).addCommit(commit);
358 publish(removedValues.value().stream()
359 .map(value -> new MultimapEvent<String, byte[]>(
360 "", key, null, value))
361 .collect(Collectors.toList()));
362 return removedValues;
363 }
364
365 /**
366 * Handles a multiRemove commit, returns true if the remove results in any
367 * change.
368 * @param commit multiRemove commit
369 * @return true if any change results, else false
370 */
371 protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
372 String key = commit.value().key();
373
374 if (!backingMap.containsKey(key)) {
375 return false;
376 }
377
378 Versioned<Collection<? extends byte[]>> removedValues = backingMap
Jordan Halterman8c57a092018-06-04 14:53:06 -0700379 .get(key)
380 .addCommit(commit);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700381
382 if (removedValues != null) {
383 if (removedValues.value().isEmpty()) {
384 backingMap.remove(key);
385 }
386
387 publish(removedValues.value().stream()
Jordan Halterman8c57a092018-06-04 14:53:06 -0700388 .map(value -> new MultimapEvent<String, byte[]>(
389 "", key, null, value))
390 .collect(Collectors.toList()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700391 return true;
392 }
393
394 return false;
395 }
396
397 /**
Jordan Halterman8c57a092018-06-04 14:53:06 -0700398 * Handles a removeAndGet commit.
399 *
400 * @param commit multiRemove commit
401 * @return the updated values or null if the values are empty
402 */
403 protected Versioned<Collection<? extends byte[]>> removeAndGet(Commit<? extends MultiRemove> commit) {
404 String key = commit.value().key();
405
406 if (!backingMap.containsKey(key)) {
407 return null;
408 }
409
410 Versioned<Collection<? extends byte[]>> removedValues = backingMap
411 .get(key)
412 .addCommit(commit);
413
414 if (removedValues != null) {
415 if (removedValues.value().isEmpty()) {
416 backingMap.remove(key);
417 }
418
419 publish(removedValues.value().stream()
420 .map(value -> new MultimapEvent<String, byte[]>(
421 "", key, null, value))
422 .collect(Collectors.toList()));
423 }
424
425 return toVersioned(backingMap.get(key));
426 }
427
428 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700429 * Handles a put commit, returns true if any change results from this
430 * commit.
431 * @param commit a put commit
432 * @return true if this commit results in a change, else false
433 */
434 protected boolean put(Commit<? extends Put> commit) {
435 String key = commit.value().key();
436 if (commit.value().values().isEmpty()) {
437 return false;
438 }
439 if (!backingMap.containsKey(key)) {
440 backingMap.put(key, new NonTransactionalCommit());
441 }
442
443 Versioned<Collection<? extends byte[]>> addedValues = backingMap
Jordan Halterman8c57a092018-06-04 14:53:06 -0700444 .get(key)
445 .addCommit(commit);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700446
447 if (addedValues != null) {
448 publish(addedValues.value().stream()
Jordan Halterman8c57a092018-06-04 14:53:06 -0700449 .map(value -> new MultimapEvent<String, byte[]>(
450 "", key, value, null))
451 .collect(Collectors.toList()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700452 return true;
453 }
454
455 return false;
456 }
457
Jordan Halterman8c57a092018-06-04 14:53:06 -0700458 /**
459 * Handles a putAndGet commit.
460 *
461 * @param commit a put commit
462 * @return the updated values
463 */
464 protected Versioned<Collection<? extends byte[]>> putAndGet(Commit<? extends Put> commit) {
465 String key = commit.value().key();
466 if (commit.value().values().isEmpty()) {
467 return null;
468 }
469 if (!backingMap.containsKey(key)) {
470 backingMap.put(key, new NonTransactionalCommit());
471 }
472
473 Versioned<Collection<? extends byte[]>> addedValues = backingMap
474 .get(key)
475 .addCommit(commit);
476
477 if (addedValues != null) {
478 publish(addedValues.value().stream()
479 .map(value -> new MultimapEvent<String, byte[]>(
480 "", key, value, null))
481 .collect(Collectors.toList()));
482 }
483
484 return toVersioned(backingMap.get(key));
485 }
486
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700487 protected Versioned<Collection<? extends byte[]>> replace(
488 Commit<? extends Replace> commit) {
489 if (!backingMap.containsKey(commit.value().key())) {
490 backingMap.put(commit.value().key(),
491 new NonTransactionalCommit());
492 }
493 return backingMap.get(commit.value().key()).addCommit(commit);
494 }
495
496 /**
497 * Handles a listen commit.
498 *
499 * @param commit listen commit
500 */
501 protected void listen(Commit<Void> commit) {
502 listeners.put(commit.session().sessionId().id(), commit.session());
503 }
504
505 /**
506 * Handles an unlisten commit.
507 *
508 * @param commit unlisten commit
509 */
510 protected void unlisten(Commit<Void> commit) {
511 listeners.remove(commit.session().sessionId().id());
512 }
513
514 /**
Jordan Halterman15f33712018-06-21 00:00:15 -0700515 * Handles an open iterator commit.
Jordan Halterman5e884352018-05-21 22:11:07 -0700516 *
Jordan Halterman15f33712018-06-21 00:00:15 -0700517 * @param commit the open iterator commit
518 * @return iterator identifier
Jordan Halterman5e884352018-05-21 22:11:07 -0700519 */
Jordan Halterman15f33712018-06-21 00:00:15 -0700520 protected long openIterator(Commit<Void> commit) {
521 iterators.put(commit.index(), new IteratorContext(
522 commit.session().sessionId().id(),
523 backingMap.entrySet().iterator()));
524 return commit.index();
525 }
526
527 /**
528 * Handles an iterator next commit.
529 *
530 * @param commit the next commit
531 * @return a list of entries to iterate
532 */
533 protected IteratorBatch next(Commit<IteratorPosition> commit) {
534 final long iteratorId = commit.value().iteratorId();
535 final int position = commit.value().position();
536
537 IteratorContext context = iterators.get(iteratorId);
538 if (context == null) {
539 return null;
540 }
541
542 List<Map.Entry<String, byte[]>> entries = new ArrayList<>();
543 int size = 0;
544 while (context.iterator.hasNext()) {
545 context.position++;
546 if (context.position > position) {
547 Map.Entry<String, MapEntryValue> entry = context.iterator.next();
548 String key = entry.getKey();
549 int keySize = key.length();
550 for (byte[] value : entry.getValue().values()) {
551 entries.add(Maps.immutableEntry(key, value));
552 size += keySize;
553 size += value.length;
554 }
555
556 if (size >= MAX_ITERATOR_BATCH_SIZE) {
557 break;
558 }
Jordan Halterman5e884352018-05-21 22:11:07 -0700559 }
560 }
Jordan Halterman15f33712018-06-21 00:00:15 -0700561
562 if (entries.isEmpty()) {
563 return null;
564 }
565 return new IteratorBatch(context.position, entries);
566 }
567
568 /**
569 * Handles a close iterator commit.
570 *
571 * @param commit the close iterator commit
572 */
573 protected void closeIterator(Commit<Long> commit) {
574 iterators.remove(commit.value());
Jordan Halterman5e884352018-05-21 22:11:07 -0700575 }
576
577 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700578 * Publishes events to listeners.
579 *
580 * @param events list of map event to publish
581 */
582 private void publish(List<MultimapEvent<String, byte[]>> events) {
583 listeners.values().forEach(session -> session.publish(CHANGE, serializer::encode, events));
584 }
585
586 private interface MapEntryValue {
587
588 /**
589 * Returns the list of raw {@code byte[]'s}.
590 *
591 * @return list of raw values
592 */
593 Collection<? extends byte[]> values();
594
595 /**
596 * Returns the version of the value.
597 *
598 * @return version
599 */
600 long version();
601
602 /**
603 * Add a new commit and modifies the set of values accordingly.
604 * In the case of a replace or removeAll it returns the set of removed
605 * values. In the case of put or multiRemove it returns null for no
606 * change and a set of the added or removed values respectively if a
607 * change resulted.
608 *
609 * @param commit the commit to be added
610 */
611 Versioned<Collection<? extends byte[]>> addCommit(
612 Commit<? extends MultimapOperation> commit);
613 }
614
615 private class NonTransactionalCommit implements MapEntryValue {
616 private long version;
617 private final TreeSet<byte[]> valueSet = Sets.newTreeSet(new ByteArrayComparator());
618
619 public NonTransactionalCommit() {
620 //Set the version to current it will only be updated once this is
621 // populated
622 this.version = globalVersion.get();
623 }
624
625 @Override
626 public Collection<? extends byte[]> values() {
627 return ImmutableSet.copyOf(valueSet);
628 }
629
630 @Override
631 public long version() {
632 return version;
633 }
634
635 @Override
636 public Versioned<Collection<? extends byte[]>> addCommit(
637 Commit<? extends MultimapOperation> commit) {
638 Preconditions.checkNotNull(commit);
639 Preconditions.checkNotNull(commit.value());
640 Versioned<Collection<? extends byte[]>> retVersion;
641
642 if (commit.value() instanceof Put) {
643 //Using a treeset here sanitizes the input, removing duplicates
644 Set<byte[]> valuesToAdd =
645 Sets.newTreeSet(new ByteArrayComparator());
646 ((Put) commit.value()).values().forEach(value -> {
647 if (!valueSet.contains(value)) {
648 valuesToAdd.add(value);
649 }
650 });
651 if (valuesToAdd.isEmpty()) {
652 //Do not increment or add the commit if no change resulted
653 return null;
654 }
655 retVersion = new Versioned<>(valuesToAdd, version);
656 valuesToAdd.forEach(value -> valueSet.add(value));
657 version++;
658 return retVersion;
659
660 } else if (commit.value() instanceof Replace) {
661 //Will this work?? Need to check before check-in!
662 Set<byte[]> removedValues = Sets.newHashSet();
663 removedValues.addAll(valueSet);
664 retVersion = new Versioned<>(removedValues, version);
665 valueSet.clear();
666 Set<byte[]> valuesToAdd =
667 Sets.newTreeSet(new ByteArrayComparator());
668 ((Replace) commit.value()).values().forEach(value -> {
669 valuesToAdd.add(value);
670 });
671 if (valuesToAdd.isEmpty()) {
672 version = globalVersion.incrementAndGet();
673 backingMap.remove(((Replace) commit.value()).key());
674 return retVersion;
675 }
676 valuesToAdd.forEach(value -> valueSet.add(value));
677 version = globalVersion.incrementAndGet();
678 return retVersion;
679
680 } else if (commit.value() instanceof RemoveAll) {
681 Set<byte[]> removed = Sets.newHashSet();
682 //We can assume here that values only appear once and so we
683 //do not need to sanitize the return for duplicates.
684 removed.addAll(valueSet);
685 retVersion = new Versioned<>(removed, version);
686 valueSet.clear();
687 //In the case of a removeAll all commits will be removed and
688 //unlike the multiRemove case we do not need to consider
689 //dependencies among additive and removal commits.
690
691 //Save the key for use after the commit is closed
692 String key = ((RemoveAll) commit.value()).key();
693 version = globalVersion.incrementAndGet();
694 backingMap.remove(key);
695 return retVersion;
696
697 } else if (commit.value() instanceof MultiRemove) {
698 //Must first calculate how many commits the removal depends on.
699 //At this time we also sanitize the removal set by adding to a
700 //set with proper handling of byte[] equality.
701 Set<byte[]> removed = Sets.newHashSet();
702 ((MultiRemove) commit.value()).values().forEach(value -> {
703 if (valueSet.contains(value)) {
704 removed.add(value);
705 }
706 });
707 //If there is nothing to be removed no action should be taken.
708 if (removed.isEmpty()) {
709 return null;
710 }
711 //Save key in case countdown results in closing the commit.
712 String removedKey = ((MultiRemove) commit.value()).key();
713 removed.forEach(removedValue -> {
714 valueSet.remove(removedValue);
715 });
716 //The version is updated locally as well as globally even if
717 //this object will be removed from the map in case any other
718 //party still holds a reference to this object.
719 retVersion = new Versioned<>(removed, version);
720 version = globalVersion.incrementAndGet();
721 if (valueSet.isEmpty()) {
722 backingMap.remove(removedKey);
723 }
724 return retVersion;
725
726 } else {
727 throw new IllegalArgumentException();
728 }
729 }
730 }
731
732 /**
733 * A collector that creates MapEntryValues and creates a multiset of all
734 * values in the map an equal number of times to the number of sets in
735 * which they participate.
736 */
737 private class HashMultisetValueCollector implements
738 Collector<MapEntryValue,
739 HashMultiset<byte[]>,
740 HashMultiset<byte[]>> {
741
742 @Override
743 public Supplier<HashMultiset<byte[]>> supplier() {
744 return HashMultiset::create;
745 }
746
747 @Override
748 public BiConsumer<HashMultiset<byte[]>, MapEntryValue> accumulator() {
749 return (multiset, mapEntryValue) ->
750 multiset.addAll(mapEntryValue.values());
751 }
752
753 @Override
754 public BinaryOperator<HashMultiset<byte[]>> combiner() {
755 return (setOne, setTwo) -> {
756 setOne.addAll(setTwo);
757 return setOne;
758 };
759 }
760
761 @Override
762 public Function<HashMultiset<byte[]>,
763 HashMultiset<byte[]>> finisher() {
764 return Function.identity();
765 }
766
767 @Override
768 public Set<Characteristics> characteristics() {
769 return EnumSet.of(Characteristics.UNORDERED);
770 }
771 }
772
773 /**
774 * A collector that creates Entries of {@code <String, MapEntryValue>} and
775 * creates a set of entries all key value pairs in the map.
776 */
777 private class EntrySetCollector implements
778 Collector<Map.Entry<String, MapEntryValue>,
779 Set<Map.Entry<String, byte[]>>,
780 Set<Map.Entry<String, byte[]>>> {
781 private Set<Map.Entry<String, byte[]>> set = null;
782
783 @Override
784 public Supplier<Set<Map.Entry<String, byte[]>>> supplier() {
785 return () -> {
786 if (set == null) {
787 set = Sets.newHashSet();
788 }
789 return set;
790 };
791 }
792
793 @Override
794 public BiConsumer<Set<Map.Entry<String, byte[]>>,
795 Map.Entry<String, MapEntryValue>> accumulator() {
796 return (set, entry) -> {
797 entry
798 .getValue()
799 .values()
800 .forEach(byteValue ->
801 set.add(Maps.immutableEntry(entry.getKey(),
802 byteValue)));
803 };
804 }
805
806 @Override
807 public BinaryOperator<Set<Map.Entry<String, byte[]>>> combiner() {
808 return (setOne, setTwo) -> {
809 setOne.addAll(setTwo);
810 return setOne;
811 };
812 }
813
814 @Override
815 public Function<Set<Map.Entry<String, byte[]>>,
816 Set<Map.Entry<String, byte[]>>> finisher() {
817 return (unused) -> set;
818 }
819
820 @Override
821 public Set<Characteristics> characteristics() {
822 return EnumSet.of(Characteristics.UNORDERED);
823 }
824 }
825 /**
826 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
827 * @param value map entry value
828 * @return versioned instance or an empty list versioned -1 if argument is
829 * null
830 */
831 private Versioned<Collection<? extends byte[]>> toVersioned(
832 MapEntryValue value) {
833 return value == null ? new Versioned<>(Lists.newArrayList(), -1) :
834 new Versioned<>(value.values(),
835 value.version());
836 }
837
838 private static class ByteArrayComparator implements Comparator<byte[]> {
839
840 @Override
841 public int compare(byte[] o1, byte[] o2) {
842 if (Arrays.equals(o1, o2)) {
843 return 0;
844 } else {
845 for (int i = 0; i < o1.length && i < o2.length; i++) {
846 if (o1[i] < o2[i]) {
847 return -1;
848 } else if (o1[i] > o2[i]) {
849 return 1;
850 }
851 }
852 return o1.length > o2.length ? 1 : -1;
853 }
854 }
855 }
Jordan Halterman15f33712018-06-21 00:00:15 -0700856
857 private static class IteratorContext {
858 private final long sessionId;
859 private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
860 private int position = 0;
861
862 IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
863 this.sessionId = sessionId;
864 this.iterator = iterator;
865 }
866 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700867}