blob: fd2e6766348cde21f80682dcb3f1a08dd748803a [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
Jordan Halterman15f33712018-06-21 00:00:15 -070019import java.util.ArrayList;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070020import java.util.Arrays;
21import java.util.Collection;
22import java.util.Comparator;
23import java.util.EnumSet;
24import java.util.HashMap;
Jordan Halterman15f33712018-06-21 00:00:15 -070025import java.util.Iterator;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070026import java.util.LinkedHashMap;
27import java.util.List;
28import java.util.Map;
29import java.util.Set;
30import java.util.TreeSet;
31import java.util.concurrent.atomic.AtomicLong;
32import java.util.function.BiConsumer;
33import java.util.function.BinaryOperator;
34import java.util.function.Function;
35import java.util.function.Supplier;
36import java.util.stream.Collector;
37import java.util.stream.Collectors;
38
39import com.esotericsoftware.kryo.Kryo;
40import com.esotericsoftware.kryo.io.Input;
41import com.esotericsoftware.kryo.io.Output;
42import com.google.common.base.Preconditions;
43import com.google.common.collect.HashMultiset;
44import com.google.common.collect.ImmutableSet;
45import com.google.common.collect.Lists;
46import com.google.common.collect.Maps;
47import com.google.common.collect.Multiset;
48import com.google.common.collect.Sets;
49import io.atomix.protocols.raft.service.AbstractRaftService;
50import io.atomix.protocols.raft.service.Commit;
51import io.atomix.protocols.raft.service.RaftServiceExecutor;
52import io.atomix.protocols.raft.session.RaftSession;
53import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
54import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
55import org.onlab.util.KryoNamespace;
56import org.onlab.util.Match;
57import org.onosproject.store.serializers.KryoNamespaces;
58import org.onosproject.store.service.MultimapEvent;
59import org.onosproject.store.service.Serializer;
60import org.onosproject.store.service.Versioned;
61
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
Jordan Halterman15f33712018-06-21 00:00:15 -070065import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070066import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsEntry;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsKey;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsValue;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ENTRIES;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
Jordan Halterman15f33712018-06-21 00:00:15 -070076import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070078import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
79import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
80import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
81import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
Jordan Halterman15f33712018-06-21 00:00:15 -070082import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
83import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070084import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
Jordan Halterman8c57a092018-06-04 14:53:06 -070085import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070086import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
87import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
88import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
Jordan Halterman8c57a092018-06-04 14:53:06 -070089import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070090import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
91import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
92import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
93import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Replace;
94import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.SIZE;
95import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.VALUES;
96
97/**
98 * State Machine for {@link AtomixConsistentSetMultimap} resource.
99 */
100public class AtomixConsistentSetMultimapService extends AbstractRaftService {
Jordan Halterman15f33712018-06-21 00:00:15 -0700101 private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700102
103 private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
104 .register(KryoNamespaces.BASIC)
105 .register(AtomixConsistentSetMultimapOperations.NAMESPACE)
106 .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
107 .register(ByteArrayComparator.class)
108 .register(new HashMap().keySet().getClass())
109 .register(TreeSet.class)
110 .register(new com.esotericsoftware.kryo.Serializer<NonTransactionalCommit>() {
111 @Override
112 public void write(Kryo kryo, Output output, NonTransactionalCommit object) {
113 kryo.writeClassAndObject(output, object.valueSet);
114 }
115
116 @Override
117 @SuppressWarnings("unchecked")
118 public NonTransactionalCommit read(Kryo kryo, Input input, Class<NonTransactionalCommit> type) {
119 NonTransactionalCommit commit = new NonTransactionalCommit();
120 commit.valueSet.addAll((Collection<byte[]>) kryo.readClassAndObject(input));
121 return commit;
122 }
123 }, NonTransactionalCommit.class)
124 .build());
125
126 private AtomicLong globalVersion = new AtomicLong(1);
127 private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
Jordan Halterman15f33712018-06-21 00:00:15 -0700128 private Map<String, MapEntryValue> backingMap = Maps.newConcurrentMap();
129 private Map<Long, IteratorContext> iterators = Maps.newHashMap();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700130
131 @Override
132 public void snapshot(SnapshotWriter writer) {
133 writer.writeLong(globalVersion.get());
134 writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer::encode);
135 writer.writeObject(backingMap, serializer::encode);
Jordan Halterman15f33712018-06-21 00:00:15 -0700136
137 Map<Long, Long> iterators = Maps.newHashMap();
138 this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
139 writer.writeObject(iterators, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700140 }
141
142 @Override
143 public void install(SnapshotReader reader) {
144 globalVersion = new AtomicLong(reader.readLong());
145
146 listeners = new LinkedHashMap<>();
147 for (Long sessionId : reader.<Set<Long>>readObject(serializer::decode)) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700148 listeners.put(sessionId, sessions().getSession(sessionId));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700149 }
150
151 backingMap = reader.readObject(serializer::decode);
Jordan Halterman15f33712018-06-21 00:00:15 -0700152
153 Map<Long, Long> iterators = reader.readObject(serializer::decode);
154 this.iterators = Maps.newHashMap();
155 iterators.forEach((id, session) ->
156 this.iterators.put(id, new IteratorContext(session, backingMap.entrySet().iterator())));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700157 }
158
159 @Override
160 protected void configure(RaftServiceExecutor executor) {
161 executor.register(SIZE, this::size, serializer::encode);
162 executor.register(IS_EMPTY, this::isEmpty, serializer::encode);
163 executor.register(CONTAINS_KEY, serializer::decode, this::containsKey, serializer::encode);
164 executor.register(CONTAINS_VALUE, serializer::decode, this::containsValue, serializer::encode);
165 executor.register(CONTAINS_ENTRY, serializer::decode, this::containsEntry, serializer::encode);
166 executor.register(CLEAR, this::clear);
167 executor.register(KEY_SET, this::keySet, serializer::encode);
168 executor.register(KEYS, this::keys, serializer::encode);
169 executor.register(VALUES, this::values, serializer::encode);
170 executor.register(ENTRIES, this::entries, serializer::encode);
171 executor.register(GET, serializer::decode, this::get, serializer::encode);
172 executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
173 executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
Jordan Halterman8c57a092018-06-04 14:53:06 -0700174 executor.register(REMOVE_AND_GET, serializer::decode, this::removeAndGet, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700175 executor.register(PUT, serializer::decode, this::put, serializer::encode);
Jordan Halterman8c57a092018-06-04 14:53:06 -0700176 executor.register(PUT_AND_GET, serializer::decode, this::putAndGet, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700177 executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
178 executor.register(ADD_LISTENER, this::listen);
179 executor.register(REMOVE_LISTENER, this::unlisten);
Jordan Halterman15f33712018-06-21 00:00:15 -0700180 executor.register(OPEN_ITERATOR, this::openIterator, serializer::encode);
181 executor.register(NEXT, serializer::decode, this::next, serializer::encode);
182 executor.register(CLOSE_ITERATOR, serializer::decode, this::closeIterator);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700183 }
184
185 @Override
186 public void onExpire(RaftSession session) {
187 listeners.remove(session.sessionId().id());
Jordan Halterman15f33712018-06-21 00:00:15 -0700188 iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700189 }
190
191 @Override
192 public void onClose(RaftSession session) {
193 listeners.remove(session.sessionId().id());
Jordan Halterman15f33712018-06-21 00:00:15 -0700194 iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700195 }
196
197 /**
198 * Handles a Size commit.
199 *
200 * @param commit Size commit
201 * @return number of unique key value pairs in the multimap
202 */
203 protected int size(Commit<Void> commit) {
204 return backingMap.values()
205 .stream()
206 .map(valueCollection -> valueCollection.values().size())
207 .collect(Collectors.summingInt(size -> size));
208 }
209
210 /**
211 * Handles an IsEmpty commit.
212 *
213 * @param commit IsEmpty commit
214 * @return true if the multimap contains no key-value pairs, else false
215 */
216 protected boolean isEmpty(Commit<Void> commit) {
217 return backingMap.isEmpty();
218 }
219
220 /**
221 * Handles a contains key commit.
222 *
223 * @param commit ContainsKey commit
224 * @return returns true if the key is in the multimap, else false
225 */
226 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
227 return backingMap.containsKey(commit.value().key());
228 }
229
230 /**
231 * Handles a ContainsValue commit.
232 *
233 * @param commit ContainsValue commit
234 * @return true if the value is in the multimap, else false
235 */
236 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
237 if (backingMap.values().isEmpty()) {
238 return false;
239 }
240 Match<byte[]> match = Match.ifValue(commit.value().value());
241 return backingMap
242 .values()
243 .stream()
244 .anyMatch(valueList ->
245 valueList
246 .values()
247 .stream()
248 .anyMatch(byteValue ->
249 match.matches(byteValue)));
250 }
251
252 /**
253 * Handles a ContainsEntry commit.
254 *
255 * @param commit ContainsEntry commit
256 * @return true if the key-value pair exists, else false
257 */
258 protected boolean containsEntry(Commit<? extends ContainsEntry> commit) {
259 MapEntryValue entryValue =
260 backingMap.get(commit.value().key());
261 if (entryValue == null) {
262 return false;
263 } else {
264 Match valueMatch = Match.ifValue(commit.value().value());
265 return entryValue
266 .values()
267 .stream()
268 .anyMatch(byteValue -> valueMatch.matches(byteValue));
269 }
270 }
271
272 /**
273 * Handles a Clear commit.
274 *
275 * @param commit Clear commit
276 */
277 protected void clear(Commit<Void> commit) {
278 backingMap.clear();
279 }
280
281 /**
282 * Handles a KeySet commit.
283 *
284 * @param commit KeySet commit
285 * @return a set of all keys in the multimap
286 */
287 protected Set<String> keySet(Commit<Void> commit) {
288 return ImmutableSet.copyOf(backingMap.keySet());
289 }
290
291 /**
292 * Handles a Keys commit.
293 *
294 * @param commit Keys commit
295 * @return a multiset of keys with each key included an equal number of
296 * times to the total key-value pairs in which that key participates
297 */
298 protected Multiset<String> keys(Commit<Void> commit) {
299 Multiset keys = HashMultiset.create();
300 backingMap.forEach((key, mapEntryValue) -> {
301 keys.add(key, mapEntryValue.values().size());
302 });
303 return keys;
304 }
305
306 /**
307 * Handles a Values commit.
308 *
309 * @param commit Values commit
310 * @return the set of values in the multimap with duplicates included
311 */
312 protected Multiset<byte[]> values(Commit<Void> commit) {
313 return backingMap
314 .values()
315 .stream()
316 .collect(new HashMultisetValueCollector());
317 }
318
319 /**
320 * Handles an Entries commit.
321 *
322 * @param commit Entries commit
323 * @return a set of all key-value pairs in the multimap
324 */
325 protected Collection<Map.Entry<String, byte[]>> entries(Commit<Void> commit) {
326 return backingMap
327 .entrySet()
328 .stream()
329 .collect(new EntrySetCollector());
330 }
331
332 /**
333 * Handles a Get commit.
334 *
335 * @param commit Get commit
336 * @return the collection of values associated with the key or an empty
337 * list if none exist
338 */
339 protected Versioned<Collection<? extends byte[]>> get(Commit<? extends Get> commit) {
340 return toVersioned(backingMap.get(commit.value().key()));
341 }
342
343 /**
344 * Handles a removeAll commit, and returns the previous mapping.
345 *
346 * @param commit removeAll commit
347 * @return collection of removed values
348 */
349 protected Versioned<Collection<? extends byte[]>> removeAll(Commit<? extends RemoveAll> commit) {
350 String key = commit.value().key();
351
352 if (!backingMap.containsKey(key)) {
353 return new Versioned<>(Sets.newHashSet(), -1);
354 }
355
356 Versioned<Collection<? extends byte[]>> removedValues =
357 backingMap.get(key).addCommit(commit);
358 publish(removedValues.value().stream()
359 .map(value -> new MultimapEvent<String, byte[]>(
360 "", key, null, value))
361 .collect(Collectors.toList()));
362 return removedValues;
363 }
364
365 /**
366 * Handles a multiRemove commit, returns true if the remove results in any
367 * change.
368 * @param commit multiRemove commit
369 * @return true if any change results, else false
370 */
371 protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
372 String key = commit.value().key();
373
374 if (!backingMap.containsKey(key)) {
375 return false;
376 }
377
378 Versioned<Collection<? extends byte[]>> removedValues = backingMap
Jordan Halterman8c57a092018-06-04 14:53:06 -0700379 .get(key)
380 .addCommit(commit);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700381
382 if (removedValues != null) {
383 if (removedValues.value().isEmpty()) {
384 backingMap.remove(key);
385 }
386
387 publish(removedValues.value().stream()
Jordan Halterman8c57a092018-06-04 14:53:06 -0700388 .map(value -> new MultimapEvent<String, byte[]>(
389 "", key, null, value))
390 .collect(Collectors.toList()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700391 return true;
392 }
393
394 return false;
395 }
396
397 /**
Jordan Halterman8c57a092018-06-04 14:53:06 -0700398 * Handles a removeAndGet commit.
399 *
400 * @param commit multiRemove commit
401 * @return the updated values or null if the values are empty
402 */
403 protected Versioned<Collection<? extends byte[]>> removeAndGet(Commit<? extends MultiRemove> commit) {
404 String key = commit.value().key();
405
406 if (!backingMap.containsKey(key)) {
407 return null;
408 }
409
410 Versioned<Collection<? extends byte[]>> removedValues = backingMap
411 .get(key)
412 .addCommit(commit);
413
414 if (removedValues != null) {
415 if (removedValues.value().isEmpty()) {
416 backingMap.remove(key);
417 }
418
419 publish(removedValues.value().stream()
420 .map(value -> new MultimapEvent<String, byte[]>(
421 "", key, null, value))
422 .collect(Collectors.toList()));
423 }
424
425 return toVersioned(backingMap.get(key));
426 }
427
428 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700429 * Handles a put commit, returns true if any change results from this
430 * commit.
431 * @param commit a put commit
432 * @return true if this commit results in a change, else false
433 */
434 protected boolean put(Commit<? extends Put> commit) {
435 String key = commit.value().key();
436 if (commit.value().values().isEmpty()) {
437 return false;
438 }
439 if (!backingMap.containsKey(key)) {
440 backingMap.put(key, new NonTransactionalCommit());
441 }
442
443 Versioned<Collection<? extends byte[]>> addedValues = backingMap
Jordan Halterman8c57a092018-06-04 14:53:06 -0700444 .get(key)
445 .addCommit(commit);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700446
447 if (addedValues != null) {
448 publish(addedValues.value().stream()
Jordan Halterman8c57a092018-06-04 14:53:06 -0700449 .map(value -> new MultimapEvent<String, byte[]>(
450 "", key, value, null))
451 .collect(Collectors.toList()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700452 return true;
453 }
454
455 return false;
456 }
457
Jordan Halterman8c57a092018-06-04 14:53:06 -0700458 /**
459 * Handles a putAndGet commit.
460 *
461 * @param commit a put commit
462 * @return the updated values
463 */
464 protected Versioned<Collection<? extends byte[]>> putAndGet(Commit<? extends Put> commit) {
465 String key = commit.value().key();
466 if (commit.value().values().isEmpty()) {
467 return null;
468 }
469 if (!backingMap.containsKey(key)) {
470 backingMap.put(key, new NonTransactionalCommit());
471 }
472
473 Versioned<Collection<? extends byte[]>> addedValues = backingMap
474 .get(key)
475 .addCommit(commit);
476
477 if (addedValues != null) {
478 publish(addedValues.value().stream()
479 .map(value -> new MultimapEvent<String, byte[]>(
480 "", key, value, null))
481 .collect(Collectors.toList()));
482 }
483
484 return toVersioned(backingMap.get(key));
485 }
486
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700487 protected Versioned<Collection<? extends byte[]>> replace(
488 Commit<? extends Replace> commit) {
Jordan Haltermanc8b591e2018-07-11 09:49:15 -0700489 String key = commit.value().key();
490 if (!backingMap.containsKey(key)) {
491 backingMap.put(key, new NonTransactionalCommit());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700492 }
Jordan Haltermanc8b591e2018-07-11 09:49:15 -0700493
494 Versioned<Collection<? extends byte[]>> values = backingMap.get(commit.value().key()).addCommit(commit);
495 if (values != null) {
496 Set<byte[]> addedValues = Sets.newTreeSet(new ByteArrayComparator());
497 addedValues.addAll(commit.value().values());
498
499 Set<byte[]> removedValues = Sets.newTreeSet(new ByteArrayComparator());
500 removedValues.addAll(values.value());
501
502 List<MultimapEvent<String, byte[]>> events = Lists.newArrayList();
503 Sets.difference(removedValues, addedValues)
504 .forEach(value -> events.add(new MultimapEvent<>("", key, null, value)));
505 Sets.difference(addedValues, removedValues)
506 .forEach(value -> events.add(new MultimapEvent<>("", key, value, null)));
507
508 publish(events);
509 }
510 return values;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700511 }
512
513 /**
514 * Handles a listen commit.
515 *
516 * @param commit listen commit
517 */
518 protected void listen(Commit<Void> commit) {
519 listeners.put(commit.session().sessionId().id(), commit.session());
520 }
521
522 /**
523 * Handles an unlisten commit.
524 *
525 * @param commit unlisten commit
526 */
527 protected void unlisten(Commit<Void> commit) {
528 listeners.remove(commit.session().sessionId().id());
529 }
530
531 /**
Jordan Halterman15f33712018-06-21 00:00:15 -0700532 * Handles an open iterator commit.
Jordan Halterman5e884352018-05-21 22:11:07 -0700533 *
Jordan Halterman15f33712018-06-21 00:00:15 -0700534 * @param commit the open iterator commit
535 * @return iterator identifier
Jordan Halterman5e884352018-05-21 22:11:07 -0700536 */
Jordan Halterman15f33712018-06-21 00:00:15 -0700537 protected long openIterator(Commit<Void> commit) {
538 iterators.put(commit.index(), new IteratorContext(
539 commit.session().sessionId().id(),
540 backingMap.entrySet().iterator()));
541 return commit.index();
542 }
543
544 /**
545 * Handles an iterator next commit.
546 *
547 * @param commit the next commit
548 * @return a list of entries to iterate
549 */
550 protected IteratorBatch next(Commit<IteratorPosition> commit) {
551 final long iteratorId = commit.value().iteratorId();
552 final int position = commit.value().position();
553
554 IteratorContext context = iterators.get(iteratorId);
555 if (context == null) {
556 return null;
557 }
558
559 List<Map.Entry<String, byte[]>> entries = new ArrayList<>();
560 int size = 0;
561 while (context.iterator.hasNext()) {
562 context.position++;
563 if (context.position > position) {
564 Map.Entry<String, MapEntryValue> entry = context.iterator.next();
565 String key = entry.getKey();
566 int keySize = key.length();
567 for (byte[] value : entry.getValue().values()) {
568 entries.add(Maps.immutableEntry(key, value));
569 size += keySize;
570 size += value.length;
571 }
572
573 if (size >= MAX_ITERATOR_BATCH_SIZE) {
574 break;
575 }
Jordan Halterman5e884352018-05-21 22:11:07 -0700576 }
577 }
Jordan Halterman15f33712018-06-21 00:00:15 -0700578
579 if (entries.isEmpty()) {
580 return null;
581 }
582 return new IteratorBatch(context.position, entries);
583 }
584
585 /**
586 * Handles a close iterator commit.
587 *
588 * @param commit the close iterator commit
589 */
590 protected void closeIterator(Commit<Long> commit) {
591 iterators.remove(commit.value());
Jordan Halterman5e884352018-05-21 22:11:07 -0700592 }
593
594 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700595 * Publishes events to listeners.
596 *
597 * @param events list of map event to publish
598 */
599 private void publish(List<MultimapEvent<String, byte[]>> events) {
600 listeners.values().forEach(session -> session.publish(CHANGE, serializer::encode, events));
601 }
602
603 private interface MapEntryValue {
604
605 /**
606 * Returns the list of raw {@code byte[]'s}.
607 *
608 * @return list of raw values
609 */
610 Collection<? extends byte[]> values();
611
612 /**
613 * Returns the version of the value.
614 *
615 * @return version
616 */
617 long version();
618
619 /**
620 * Add a new commit and modifies the set of values accordingly.
621 * In the case of a replace or removeAll it returns the set of removed
622 * values. In the case of put or multiRemove it returns null for no
623 * change and a set of the added or removed values respectively if a
624 * change resulted.
625 *
626 * @param commit the commit to be added
627 */
628 Versioned<Collection<? extends byte[]>> addCommit(
629 Commit<? extends MultimapOperation> commit);
630 }
631
632 private class NonTransactionalCommit implements MapEntryValue {
633 private long version;
634 private final TreeSet<byte[]> valueSet = Sets.newTreeSet(new ByteArrayComparator());
635
636 public NonTransactionalCommit() {
637 //Set the version to current it will only be updated once this is
638 // populated
639 this.version = globalVersion.get();
640 }
641
642 @Override
643 public Collection<? extends byte[]> values() {
644 return ImmutableSet.copyOf(valueSet);
645 }
646
647 @Override
648 public long version() {
649 return version;
650 }
651
652 @Override
653 public Versioned<Collection<? extends byte[]>> addCommit(
654 Commit<? extends MultimapOperation> commit) {
655 Preconditions.checkNotNull(commit);
656 Preconditions.checkNotNull(commit.value());
657 Versioned<Collection<? extends byte[]>> retVersion;
658
659 if (commit.value() instanceof Put) {
660 //Using a treeset here sanitizes the input, removing duplicates
661 Set<byte[]> valuesToAdd =
662 Sets.newTreeSet(new ByteArrayComparator());
663 ((Put) commit.value()).values().forEach(value -> {
664 if (!valueSet.contains(value)) {
665 valuesToAdd.add(value);
666 }
667 });
668 if (valuesToAdd.isEmpty()) {
669 //Do not increment or add the commit if no change resulted
670 return null;
671 }
672 retVersion = new Versioned<>(valuesToAdd, version);
673 valuesToAdd.forEach(value -> valueSet.add(value));
674 version++;
675 return retVersion;
676
677 } else if (commit.value() instanceof Replace) {
678 //Will this work?? Need to check before check-in!
679 Set<byte[]> removedValues = Sets.newHashSet();
680 removedValues.addAll(valueSet);
681 retVersion = new Versioned<>(removedValues, version);
682 valueSet.clear();
683 Set<byte[]> valuesToAdd =
684 Sets.newTreeSet(new ByteArrayComparator());
685 ((Replace) commit.value()).values().forEach(value -> {
686 valuesToAdd.add(value);
687 });
688 if (valuesToAdd.isEmpty()) {
689 version = globalVersion.incrementAndGet();
690 backingMap.remove(((Replace) commit.value()).key());
691 return retVersion;
692 }
693 valuesToAdd.forEach(value -> valueSet.add(value));
694 version = globalVersion.incrementAndGet();
695 return retVersion;
696
697 } else if (commit.value() instanceof RemoveAll) {
698 Set<byte[]> removed = Sets.newHashSet();
699 //We can assume here that values only appear once and so we
700 //do not need to sanitize the return for duplicates.
701 removed.addAll(valueSet);
702 retVersion = new Versioned<>(removed, version);
703 valueSet.clear();
704 //In the case of a removeAll all commits will be removed and
705 //unlike the multiRemove case we do not need to consider
706 //dependencies among additive and removal commits.
707
708 //Save the key for use after the commit is closed
709 String key = ((RemoveAll) commit.value()).key();
710 version = globalVersion.incrementAndGet();
711 backingMap.remove(key);
712 return retVersion;
713
714 } else if (commit.value() instanceof MultiRemove) {
715 //Must first calculate how many commits the removal depends on.
716 //At this time we also sanitize the removal set by adding to a
717 //set with proper handling of byte[] equality.
718 Set<byte[]> removed = Sets.newHashSet();
719 ((MultiRemove) commit.value()).values().forEach(value -> {
720 if (valueSet.contains(value)) {
721 removed.add(value);
722 }
723 });
724 //If there is nothing to be removed no action should be taken.
725 if (removed.isEmpty()) {
726 return null;
727 }
728 //Save key in case countdown results in closing the commit.
729 String removedKey = ((MultiRemove) commit.value()).key();
730 removed.forEach(removedValue -> {
731 valueSet.remove(removedValue);
732 });
733 //The version is updated locally as well as globally even if
734 //this object will be removed from the map in case any other
735 //party still holds a reference to this object.
736 retVersion = new Versioned<>(removed, version);
737 version = globalVersion.incrementAndGet();
738 if (valueSet.isEmpty()) {
739 backingMap.remove(removedKey);
740 }
741 return retVersion;
742
743 } else {
744 throw new IllegalArgumentException();
745 }
746 }
747 }
748
749 /**
750 * A collector that creates MapEntryValues and creates a multiset of all
751 * values in the map an equal number of times to the number of sets in
752 * which they participate.
753 */
754 private class HashMultisetValueCollector implements
755 Collector<MapEntryValue,
756 HashMultiset<byte[]>,
757 HashMultiset<byte[]>> {
758
759 @Override
760 public Supplier<HashMultiset<byte[]>> supplier() {
761 return HashMultiset::create;
762 }
763
764 @Override
765 public BiConsumer<HashMultiset<byte[]>, MapEntryValue> accumulator() {
766 return (multiset, mapEntryValue) ->
767 multiset.addAll(mapEntryValue.values());
768 }
769
770 @Override
771 public BinaryOperator<HashMultiset<byte[]>> combiner() {
772 return (setOne, setTwo) -> {
773 setOne.addAll(setTwo);
774 return setOne;
775 };
776 }
777
778 @Override
779 public Function<HashMultiset<byte[]>,
780 HashMultiset<byte[]>> finisher() {
781 return Function.identity();
782 }
783
784 @Override
785 public Set<Characteristics> characteristics() {
786 return EnumSet.of(Characteristics.UNORDERED);
787 }
788 }
789
790 /**
791 * A collector that creates Entries of {@code <String, MapEntryValue>} and
792 * creates a set of entries all key value pairs in the map.
793 */
794 private class EntrySetCollector implements
795 Collector<Map.Entry<String, MapEntryValue>,
796 Set<Map.Entry<String, byte[]>>,
797 Set<Map.Entry<String, byte[]>>> {
798 private Set<Map.Entry<String, byte[]>> set = null;
799
800 @Override
801 public Supplier<Set<Map.Entry<String, byte[]>>> supplier() {
802 return () -> {
803 if (set == null) {
804 set = Sets.newHashSet();
805 }
806 return set;
807 };
808 }
809
810 @Override
811 public BiConsumer<Set<Map.Entry<String, byte[]>>,
812 Map.Entry<String, MapEntryValue>> accumulator() {
813 return (set, entry) -> {
814 entry
815 .getValue()
816 .values()
817 .forEach(byteValue ->
818 set.add(Maps.immutableEntry(entry.getKey(),
819 byteValue)));
820 };
821 }
822
823 @Override
824 public BinaryOperator<Set<Map.Entry<String, byte[]>>> combiner() {
825 return (setOne, setTwo) -> {
826 setOne.addAll(setTwo);
827 return setOne;
828 };
829 }
830
831 @Override
832 public Function<Set<Map.Entry<String, byte[]>>,
833 Set<Map.Entry<String, byte[]>>> finisher() {
834 return (unused) -> set;
835 }
836
837 @Override
838 public Set<Characteristics> characteristics() {
839 return EnumSet.of(Characteristics.UNORDERED);
840 }
841 }
842 /**
843 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
844 * @param value map entry value
845 * @return versioned instance or an empty list versioned -1 if argument is
846 * null
847 */
848 private Versioned<Collection<? extends byte[]>> toVersioned(
849 MapEntryValue value) {
850 return value == null ? new Versioned<>(Lists.newArrayList(), -1) :
851 new Versioned<>(value.values(),
852 value.version());
853 }
854
855 private static class ByteArrayComparator implements Comparator<byte[]> {
856
857 @Override
858 public int compare(byte[] o1, byte[] o2) {
859 if (Arrays.equals(o1, o2)) {
860 return 0;
861 } else {
862 for (int i = 0; i < o1.length && i < o2.length; i++) {
863 if (o1[i] < o2[i]) {
864 return -1;
865 } else if (o1[i] > o2[i]) {
866 return 1;
867 }
868 }
869 return o1.length > o2.length ? 1 : -1;
870 }
871 }
872 }
Jordan Halterman15f33712018-06-21 00:00:15 -0700873
874 private static class IteratorContext {
875 private final long sessionId;
876 private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
877 private int position = 0;
878
879 IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
880 this.sessionId = sessionId;
881 this.iterator = iterator;
882 }
883 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700884}