blob: de22a751d96edaa0a6ed3b3e7f25a3523fef55ae [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.onosproject.store.service.MapEvent.Type.INSERT;
19import static org.onosproject.store.service.MapEvent.Type.REMOVE;
20import static org.onosproject.store.service.MapEvent.Type.UPDATE;
21import io.atomix.copycat.client.session.Session;
22import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.SessionListener;
26import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
27import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
28import io.atomix.resource.ResourceStateMachine;
29
30import java.util.Collection;
31import java.util.HashMap;
32import java.util.Iterator;
33import java.util.Map;
34import java.util.Set;
35import java.util.concurrent.atomic.AtomicLong;
36import java.util.stream.Collectors;
37
38import org.onlab.util.CountDownCompleter;
39import org.onlab.util.Match;
Madan Jampanicadd70b2016-02-08 13:45:43 -080040import org.onosproject.store.primitives.TransactionId;
41import org.onosproject.store.primitives.impl.Transaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080042import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
43import org.onosproject.store.service.MapEvent;
44import org.onosproject.store.service.Versioned;
45
46import com.google.common.collect.Maps;
47import com.google.common.collect.Sets;
48
49import static com.google.common.base.Preconditions.checkState;
50
51/**
52 * State Machine for {@link AtomixConsistentMap} resource.
53 */
54public class AtomixConsistentMapState extends ResourceStateMachine implements
55 SessionListener, Snapshottable {
56 private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
57 private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
58 private final Set<String> preparedKeys = Sets.newHashSet();
59 private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps
60 .newHashMap();
61 private AtomicLong versionCounter = new AtomicLong(0);
62
63 @Override
64 public void snapshot(SnapshotWriter writer) {
65 writer.writeLong(versionCounter.get());
66 }
67
68 @Override
69 public void install(SnapshotReader reader) {
70 versionCounter = new AtomicLong(reader.readLong());
71 }
72
73 @Override
74 protected void configure(StateMachineExecutor executor) {
75 // Listeners
76 executor.register(AtomixConsistentMapCommands.Listen.class,
77 this::listen);
78 executor.register(AtomixConsistentMapCommands.Unlisten.class,
79 this::unlisten);
80 // Queries
81 executor.register(AtomixConsistentMapCommands.ContainsKey.class,
82 this::containsKey);
83 executor.register(AtomixConsistentMapCommands.ContainsValue.class,
84 this::containsValue);
85 executor.register(AtomixConsistentMapCommands.EntrySet.class,
86 this::entrySet);
87 executor.register(AtomixConsistentMapCommands.Get.class, this::get);
88 executor.register(AtomixConsistentMapCommands.IsEmpty.class,
89 this::isEmpty);
90 executor.register(AtomixConsistentMapCommands.KeySet.class,
91 this::keySet);
92 executor.register(AtomixConsistentMapCommands.Size.class, this::size);
93 executor.register(AtomixConsistentMapCommands.Values.class,
94 this::values);
95 // Commands
96 executor.register(AtomixConsistentMapCommands.UpdateAndGet.class,
97 this::updateAndGet);
98 executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
99 executor.register(AtomixConsistentMapCommands.TransactionPrepare.class,
100 this::prepare);
101 executor.register(AtomixConsistentMapCommands.TransactionCommit.class,
102 this::commit);
103 executor.register(
104 AtomixConsistentMapCommands.TransactionRollback.class,
105 this::rollback);
106 }
107
108 @Override
109 public void delete() {
110 // Delete Listeners
111 listeners.values().forEach(Commit::close);
112 listeners.clear();
113
114 // Delete Map entries
115 mapEntries.values().forEach(MapEntryValue::discard);
116 mapEntries.clear();
117 }
118
119 /**
120 * Handles a contains key commit.
121 *
122 * @param commit
123 * containsKey commit
124 * @return {@code true} if map contains key
125 */
126 protected boolean containsKey(
127 Commit<? extends AtomixConsistentMapCommands.ContainsKey> commit) {
128 try {
129 return toVersioned(mapEntries.get(commit.operation().key())) != null;
130 } finally {
131 commit.close();
132 }
133 }
134
135 /**
136 * Handles a contains value commit.
137 *
138 * @param commit
139 * containsValue commit
140 * @return {@code true} if map contains value
141 */
142 protected boolean containsValue(
143 Commit<? extends AtomixConsistentMapCommands.ContainsValue> commit) {
144 try {
145 Match<byte[]> valueMatch = Match
146 .ifValue(commit.operation().value());
147 return mapEntries.values().stream()
148 .anyMatch(value -> valueMatch.matches(value.value()));
149 } finally {
150 commit.close();
151 }
152 }
153
154 /**
155 * Handles a get commit.
156 *
157 * @param commit
158 * get commit
159 * @return value mapped to key
160 */
161 protected Versioned<byte[]> get(
162 Commit<? extends AtomixConsistentMapCommands.Get> commit) {
163 try {
164 return toVersioned(mapEntries.get(commit.operation().key()));
165 } finally {
166 commit.close();
167 }
168 }
169
170 /**
171 * Handles a count commit.
172 *
173 * @param commit
174 * size commit
175 * @return number of entries in map
176 */
177 protected int size(Commit<? extends AtomixConsistentMapCommands.Size> commit) {
178 try {
179 return mapEntries.size();
180 } finally {
181 commit.close();
182 }
183 }
184
185 /**
186 * Handles an is empty commit.
187 *
188 * @param commit
189 * isEmpty commit
190 * @return {@code true} if map is empty
191 */
192 protected boolean isEmpty(
193 Commit<? extends AtomixConsistentMapCommands.IsEmpty> commit) {
194 try {
195 return mapEntries.isEmpty();
196 } finally {
197 commit.close();
198 }
199 }
200
201 /**
202 * Handles a keySet commit.
203 *
204 * @param commit
205 * keySet commit
206 * @return set of keys in map
207 */
208 protected Set<String> keySet(
209 Commit<? extends AtomixConsistentMapCommands.KeySet> commit) {
210 try {
211 return mapEntries.keySet();
212 } finally {
213 commit.close();
214 }
215 }
216
217 /**
218 * Handles a values commit.
219 *
220 * @param commit
221 * values commit
222 * @return collection of values in map
223 */
224 protected Collection<Versioned<byte[]>> values(
225 Commit<? extends AtomixConsistentMapCommands.Values> commit) {
226 try {
227 return mapEntries.values().stream().map(this::toVersioned)
228 .collect(Collectors.toList());
229 } finally {
230 commit.close();
231 }
232 }
233
234 /**
235 * Handles a entry set commit.
236 *
237 * @param commit
238 * entrySet commit
239 * @return set of map entries
240 */
241 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
242 Commit<? extends AtomixConsistentMapCommands.EntrySet> commit) {
243 try {
244 return mapEntries
245 .entrySet()
246 .stream()
247 .map(e -> Maps.immutableEntry(e.getKey(),
248 toVersioned(e.getValue())))
249 .collect(Collectors.toSet());
250 } finally {
251 commit.close();
252 }
253 }
254
255 /**
256 * Handles a update and get commit.
257 *
258 * @param commit
259 * updateAndGet commit
260 * @return update result
261 */
262 protected MapEntryUpdateResult<String, byte[]> updateAndGet(
263 Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
264 MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
265 String key = commit.operation().key();
266 MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
267 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
268
269 if (updateStatus != MapEntryUpdateResult.Status.OK) {
270 commit.close();
271 return new MapEntryUpdateResult<>(updateStatus, "", key,
272 oldMapValue, oldMapValue);
273 }
274
275 byte[] newValue = commit.operation().value();
276 long newVersion = versionCounter.incrementAndGet();
277 Versioned<byte[]> newMapValue = newValue == null ? null
278 : new Versioned<>(newValue, newVersion);
279
280 MapEvent.Type updateType = newValue == null ? REMOVE
281 : oldCommitValue == null ? INSERT : UPDATE;
282 if (updateType == REMOVE || updateType == UPDATE) {
283 mapEntries.remove(key);
284 oldCommitValue.discard();
285 }
286 if (updateType == INSERT || updateType == UPDATE) {
287 mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
288 }
289 notify(new MapEvent<>("", key, newMapValue, oldMapValue));
290 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
291 newMapValue);
292 }
293
294 /**
295 * Handles a clear commit.
296 *
297 * @param commit
298 * clear commit
299 * @return clear result
300 */
301 protected MapEntryUpdateResult.Status clear(
302 Commit<? extends AtomixConsistentMapCommands.Clear> commit) {
303 try {
304 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
305 .entrySet().iterator();
306 while (iterator.hasNext()) {
307 Map.Entry<String, MapEntryValue> entry = iterator.next();
308 String key = entry.getKey();
309 MapEntryValue value = entry.getValue();
310 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
311 value.version());
312 notify(new MapEvent<>("", key, null, removedValue));
313 value.discard();
314 iterator.remove();
315 }
316 return MapEntryUpdateResult.Status.OK;
317 } finally {
318 commit.close();
319 }
320 }
321
322 /**
323 * Handles a listen commit.
324 *
325 * @param commit
326 * listen commit
327 */
328 protected void listen(
329 Commit<? extends AtomixConsistentMapCommands.Listen> commit) {
330 Long sessionId = commit.session().id();
331 listeners.put(sessionId, commit);
332 commit.session()
333 .onStateChange(
334 state -> {
335 if (state == Session.State.CLOSED
336 || state == Session.State.EXPIRED) {
337 Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
338 .remove(sessionId);
339 if (listener != null) {
340 listener.close();
341 }
342 }
343 });
344 }
345
346 /**
347 * Handles an unlisten commit.
348 *
349 * @param commit
350 * unlisten commit
351 */
352 protected void unlisten(
353 Commit<? extends AtomixConsistentMapCommands.Unlisten> commit) {
354 try {
355 Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
356 .remove(commit.session());
357 if (listener != null) {
358 listener.close();
359 }
360 } finally {
361 commit.close();
362 }
363 }
364
365 /**
366 * Triggers a change event.
367 *
368 * @param value
369 * map event
370 */
371 private void notify(MapEvent<String, byte[]> value) {
372 listeners.values().forEach(
373 commit -> commit.session().publish("change", value));
374 }
375
376 /**
377 * Handles an prepare commit.
378 *
379 * @param commit
380 * transaction prepare commit
381 * @return prepare result
382 */
383 protected PrepareResult prepare(
384 Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
385 boolean ok = false;
386 try {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800387 Transaction transaction = commit.operation().transaction();
388 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800389 String key = update.key();
390 if (preparedKeys.contains(key)) {
391 return PrepareResult.CONCURRENT_TRANSACTION;
392 }
393 MapEntryValue existingValue = mapEntries.get(key);
394 if (existingValue == null) {
395 if (update.currentValue() != null) {
396 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
397 }
398 } else {
399 if (existingValue.version() != update.currentVersion()) {
400 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
401 }
402 }
403 }
404 // No violations detected. Add to pendingTranctions and mark
405 // modified keys as
406 // currently locked to updates.
Madan Jampanicadd70b2016-02-08 13:45:43 -0800407 pendingTransactions.put(transaction.id(), commit);
408 transaction.updates().forEach(u -> preparedKeys.add(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800409 ok = true;
410 return PrepareResult.OK;
411 } finally {
412 if (!ok) {
413 commit.close();
414 }
415 }
416 }
417
418 /**
419 * Handles an commit commit (ha!).
420 *
421 * @param commit transaction commit commit
422 * @return commit result
423 */
424 protected CommitResult commit(
425 Commit<? extends AtomixConsistentMapCommands.TransactionCommit> commit) {
426 TransactionId transactionId = commit.operation().transactionId();
427 try {
428 Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
429 .remove(transactionId);
430 if (prepareCommit == null) {
431 return CommitResult.UNKNOWN_TRANSACTION_ID;
432 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800433 Transaction transaction = prepareCommit.operation().transaction();
434 long totalReferencesToCommit = transaction
435 .updates()
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800436 .stream()
437 .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
438 .count();
439 CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
440 new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800441 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800442 String key = update.key();
443 MapEntryValue previousValue = mapEntries.remove(key);
444 MapEntryValue newValue = null;
445 checkState(preparedKeys.remove(key), "key is not prepared");
446 if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
447 newValue = new TransactionalCommit(key,
448 versionCounter.incrementAndGet(), completer);
449 }
450 mapEntries.put(key, newValue);
451 // Notify map listeners
452 notify(new MapEvent<>("", key, toVersioned(newValue),
453 toVersioned(previousValue)));
454 }
455 return CommitResult.OK;
456 } finally {
457 commit.close();
458 }
459 }
460
461 /**
462 * Handles an rollback commit (ha!).
463 *
464 * @param commit transaction rollback commit
465 * @return rollback result
466 */
467 protected RollbackResult rollback(
468 Commit<? extends AtomixConsistentMapCommands.TransactionRollback> commit) {
469 TransactionId transactionId = commit.operation().transactionId();
470 try {
471 Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
472 .remove(transactionId);
473 if (prepareCommit == null) {
474 return RollbackResult.UNKNOWN_TRANSACTION_ID;
475 } else {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800476 prepareCommit.operation()
477 .transaction()
478 .updates()
479 .forEach(u -> preparedKeys.remove(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800480 prepareCommit.close();
481 return RollbackResult.OK;
482 }
483 } finally {
484 commit.close();
485 }
486 }
487
488 private MapEntryUpdateResult.Status validate(
489 AtomixConsistentMapCommands.UpdateAndGet update) {
490 MapEntryValue existingValue = mapEntries.get(update.key());
491 if (existingValue == null && update.value() == null) {
492 return MapEntryUpdateResult.Status.NOOP;
493 }
494 if (preparedKeys.contains(update.key())) {
495 return MapEntryUpdateResult.Status.WRITE_LOCK;
496 }
497 byte[] existingRawValue = existingValue == null ? null : existingValue
498 .value();
499 Long existingVersion = existingValue == null ? null : existingValue
500 .version();
501 return update.valueMatch().matches(existingRawValue)
502 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
503 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
504 }
505
506 private Versioned<byte[]> toVersioned(MapEntryValue value) {
507 return value == null ? null : new Versioned<>(value.value(),
508 value.version());
509 }
510
511 @Override
512 public void register(Session session) {
513 }
514
515 @Override
516 public void unregister(Session session) {
517 closeListener(session.id());
518 }
519
520 @Override
521 public void expire(Session session) {
522 closeListener(session.id());
523 }
524
525 @Override
526 public void close(Session session) {
527 closeListener(session.id());
528 }
529
530 private void closeListener(Long sessionId) {
531 Commit<? extends AtomixConsistentMapCommands.Listen> commit = listeners
532 .remove(sessionId);
533 if (commit != null) {
534 commit.close();
535 }
536 }
537
538 /**
539 * Interface implemented by map values.
540 */
541 private interface MapEntryValue {
542 /**
543 * Returns the raw {@code byte[]}.
544 *
545 * @return raw value
546 */
547 byte[] value();
548
549 /**
550 * Returns the version of the value.
551 *
552 * @return version
553 */
554 long version();
555
556 /**
557 * Discards the value by invoke appropriate clean up actions.
558 */
559 void discard();
560 }
561
562 /**
563 * A {@code MapEntryValue} that is derived from a non-transactional update
564 * i.e. via any standard map update operation.
565 */
566 private class NonTransactionalCommit implements MapEntryValue {
567 private final long version;
568 private final Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit;
569
570 public NonTransactionalCommit(
571 long version,
572 Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
573 this.version = version;
574 this.commit = commit;
575 }
576
577 @Override
578 public byte[] value() {
579 return commit.operation().value();
580 }
581
582 @Override
583 public long version() {
584 return version;
585 }
586
587 @Override
588 public void discard() {
589 commit.close();
590 }
591 }
592
593 /**
594 * A {@code MapEntryValue} that is derived from updates submitted via a
595 * transaction.
596 */
597 private class TransactionalCommit implements MapEntryValue {
598 private final String key;
599 private final long version;
600 private final CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer;
601
602 public TransactionalCommit(
603 String key,
604 long version,
605 CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> commit) {
606 this.key = key;
607 this.version = version;
608 this.completer = commit;
609 }
610
611 @Override
612 public byte[] value() {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800613 Transaction transaction = completer.object().operation().transaction();
614 return valueForKey(key, transaction);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800615 }
616
617 @Override
618 public long version() {
619 return version;
620 }
621
622 @Override
623 public void discard() {
624 completer.countDown();
625 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800626
627 private byte[] valueForKey(String key, Transaction transaction) {
628 MapUpdate<String, byte[]> update = transaction.updates()
629 .stream()
630 .filter(u -> u.key().equals(key))
631 .findFirst()
632 .orElse(null);
633 return update == null ? null : update.value();
634 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800635 }
636}