blob: e580fed4b5f43d1db07e200f96c05721dab5365c [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.onosproject.store.service.MapEvent.Type.INSERT;
19import static org.onosproject.store.service.MapEvent.Type.REMOVE;
20import static org.onosproject.store.service.MapEvent.Type.UPDATE;
21import io.atomix.copycat.client.session.Session;
22import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.SessionListener;
26import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
27import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
28import io.atomix.resource.ResourceStateMachine;
29
30import java.util.Collection;
31import java.util.HashMap;
32import java.util.Iterator;
33import java.util.Map;
34import java.util.Set;
35import java.util.concurrent.atomic.AtomicLong;
36import java.util.stream.Collectors;
37
38import org.onlab.util.CountDownCompleter;
39import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080040import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080041import org.onosproject.store.primitives.TransactionId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080042import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
43import org.onosproject.store.service.MapEvent;
Madan Jampani74da78b2016-02-09 21:18:36 -080044import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080045import org.onosproject.store.service.Versioned;
46
47import com.google.common.collect.Maps;
48import com.google.common.collect.Sets;
49
50import static com.google.common.base.Preconditions.checkState;
51
52/**
53 * State Machine for {@link AtomixConsistentMap} resource.
54 */
55public class AtomixConsistentMapState extends ResourceStateMachine implements
56 SessionListener, Snapshottable {
57 private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
58 private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
59 private final Set<String> preparedKeys = Sets.newHashSet();
60 private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps
61 .newHashMap();
62 private AtomicLong versionCounter = new AtomicLong(0);
63
64 @Override
65 public void snapshot(SnapshotWriter writer) {
66 writer.writeLong(versionCounter.get());
67 }
68
69 @Override
70 public void install(SnapshotReader reader) {
71 versionCounter = new AtomicLong(reader.readLong());
72 }
73
74 @Override
75 protected void configure(StateMachineExecutor executor) {
76 // Listeners
77 executor.register(AtomixConsistentMapCommands.Listen.class,
78 this::listen);
79 executor.register(AtomixConsistentMapCommands.Unlisten.class,
80 this::unlisten);
81 // Queries
82 executor.register(AtomixConsistentMapCommands.ContainsKey.class,
83 this::containsKey);
84 executor.register(AtomixConsistentMapCommands.ContainsValue.class,
85 this::containsValue);
86 executor.register(AtomixConsistentMapCommands.EntrySet.class,
87 this::entrySet);
88 executor.register(AtomixConsistentMapCommands.Get.class, this::get);
89 executor.register(AtomixConsistentMapCommands.IsEmpty.class,
90 this::isEmpty);
91 executor.register(AtomixConsistentMapCommands.KeySet.class,
92 this::keySet);
93 executor.register(AtomixConsistentMapCommands.Size.class, this::size);
94 executor.register(AtomixConsistentMapCommands.Values.class,
95 this::values);
96 // Commands
97 executor.register(AtomixConsistentMapCommands.UpdateAndGet.class,
98 this::updateAndGet);
99 executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
100 executor.register(AtomixConsistentMapCommands.TransactionPrepare.class,
101 this::prepare);
102 executor.register(AtomixConsistentMapCommands.TransactionCommit.class,
103 this::commit);
104 executor.register(
105 AtomixConsistentMapCommands.TransactionRollback.class,
106 this::rollback);
107 }
108
109 @Override
110 public void delete() {
111 // Delete Listeners
112 listeners.values().forEach(Commit::close);
113 listeners.clear();
114
115 // Delete Map entries
116 mapEntries.values().forEach(MapEntryValue::discard);
117 mapEntries.clear();
118 }
119
120 /**
121 * Handles a contains key commit.
122 *
123 * @param commit
124 * containsKey commit
125 * @return {@code true} if map contains key
126 */
127 protected boolean containsKey(
128 Commit<? extends AtomixConsistentMapCommands.ContainsKey> commit) {
129 try {
130 return toVersioned(mapEntries.get(commit.operation().key())) != null;
131 } finally {
132 commit.close();
133 }
134 }
135
136 /**
137 * Handles a contains value commit.
138 *
139 * @param commit
140 * containsValue commit
141 * @return {@code true} if map contains value
142 */
143 protected boolean containsValue(
144 Commit<? extends AtomixConsistentMapCommands.ContainsValue> commit) {
145 try {
146 Match<byte[]> valueMatch = Match
147 .ifValue(commit.operation().value());
148 return mapEntries.values().stream()
149 .anyMatch(value -> valueMatch.matches(value.value()));
150 } finally {
151 commit.close();
152 }
153 }
154
155 /**
156 * Handles a get commit.
157 *
158 * @param commit
159 * get commit
160 * @return value mapped to key
161 */
162 protected Versioned<byte[]> get(
163 Commit<? extends AtomixConsistentMapCommands.Get> commit) {
164 try {
165 return toVersioned(mapEntries.get(commit.operation().key()));
166 } finally {
167 commit.close();
168 }
169 }
170
171 /**
172 * Handles a count commit.
173 *
174 * @param commit
175 * size commit
176 * @return number of entries in map
177 */
178 protected int size(Commit<? extends AtomixConsistentMapCommands.Size> commit) {
179 try {
180 return mapEntries.size();
181 } finally {
182 commit.close();
183 }
184 }
185
186 /**
187 * Handles an is empty commit.
188 *
189 * @param commit
190 * isEmpty commit
191 * @return {@code true} if map is empty
192 */
193 protected boolean isEmpty(
194 Commit<? extends AtomixConsistentMapCommands.IsEmpty> commit) {
195 try {
196 return mapEntries.isEmpty();
197 } finally {
198 commit.close();
199 }
200 }
201
202 /**
203 * Handles a keySet commit.
204 *
205 * @param commit
206 * keySet commit
207 * @return set of keys in map
208 */
209 protected Set<String> keySet(
210 Commit<? extends AtomixConsistentMapCommands.KeySet> commit) {
211 try {
212 return mapEntries.keySet();
213 } finally {
214 commit.close();
215 }
216 }
217
218 /**
219 * Handles a values commit.
220 *
221 * @param commit
222 * values commit
223 * @return collection of values in map
224 */
225 protected Collection<Versioned<byte[]>> values(
226 Commit<? extends AtomixConsistentMapCommands.Values> commit) {
227 try {
228 return mapEntries.values().stream().map(this::toVersioned)
229 .collect(Collectors.toList());
230 } finally {
231 commit.close();
232 }
233 }
234
235 /**
236 * Handles a entry set commit.
237 *
238 * @param commit
239 * entrySet commit
240 * @return set of map entries
241 */
242 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
243 Commit<? extends AtomixConsistentMapCommands.EntrySet> commit) {
244 try {
245 return mapEntries
246 .entrySet()
247 .stream()
248 .map(e -> Maps.immutableEntry(e.getKey(),
249 toVersioned(e.getValue())))
250 .collect(Collectors.toSet());
251 } finally {
252 commit.close();
253 }
254 }
255
256 /**
257 * Handles a update and get commit.
258 *
259 * @param commit
260 * updateAndGet commit
261 * @return update result
262 */
263 protected MapEntryUpdateResult<String, byte[]> updateAndGet(
264 Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
265 MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
266 String key = commit.operation().key();
267 MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
268 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
269
270 if (updateStatus != MapEntryUpdateResult.Status.OK) {
271 commit.close();
272 return new MapEntryUpdateResult<>(updateStatus, "", key,
273 oldMapValue, oldMapValue);
274 }
275
276 byte[] newValue = commit.operation().value();
277 long newVersion = versionCounter.incrementAndGet();
278 Versioned<byte[]> newMapValue = newValue == null ? null
279 : new Versioned<>(newValue, newVersion);
280
281 MapEvent.Type updateType = newValue == null ? REMOVE
282 : oldCommitValue == null ? INSERT : UPDATE;
283 if (updateType == REMOVE || updateType == UPDATE) {
284 mapEntries.remove(key);
285 oldCommitValue.discard();
286 }
287 if (updateType == INSERT || updateType == UPDATE) {
288 mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
289 }
290 notify(new MapEvent<>("", key, newMapValue, oldMapValue));
291 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
292 newMapValue);
293 }
294
295 /**
296 * Handles a clear commit.
297 *
298 * @param commit
299 * clear commit
300 * @return clear result
301 */
302 protected MapEntryUpdateResult.Status clear(
303 Commit<? extends AtomixConsistentMapCommands.Clear> commit) {
304 try {
305 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
306 .entrySet().iterator();
307 while (iterator.hasNext()) {
308 Map.Entry<String, MapEntryValue> entry = iterator.next();
309 String key = entry.getKey();
310 MapEntryValue value = entry.getValue();
311 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
312 value.version());
313 notify(new MapEvent<>("", key, null, removedValue));
314 value.discard();
315 iterator.remove();
316 }
317 return MapEntryUpdateResult.Status.OK;
318 } finally {
319 commit.close();
320 }
321 }
322
323 /**
324 * Handles a listen commit.
325 *
326 * @param commit
327 * listen commit
328 */
329 protected void listen(
330 Commit<? extends AtomixConsistentMapCommands.Listen> commit) {
331 Long sessionId = commit.session().id();
332 listeners.put(sessionId, commit);
333 commit.session()
334 .onStateChange(
335 state -> {
336 if (state == Session.State.CLOSED
337 || state == Session.State.EXPIRED) {
338 Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
339 .remove(sessionId);
340 if (listener != null) {
341 listener.close();
342 }
343 }
344 });
345 }
346
347 /**
348 * Handles an unlisten commit.
349 *
350 * @param commit
351 * unlisten commit
352 */
353 protected void unlisten(
354 Commit<? extends AtomixConsistentMapCommands.Unlisten> commit) {
355 try {
356 Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
357 .remove(commit.session());
358 if (listener != null) {
359 listener.close();
360 }
361 } finally {
362 commit.close();
363 }
364 }
365
366 /**
367 * Triggers a change event.
368 *
369 * @param value
370 * map event
371 */
372 private void notify(MapEvent<String, byte[]> value) {
373 listeners.values().forEach(
374 commit -> commit.session().publish("change", value));
375 }
376
377 /**
378 * Handles an prepare commit.
379 *
380 * @param commit
381 * transaction prepare commit
382 * @return prepare result
383 */
384 protected PrepareResult prepare(
385 Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
386 boolean ok = false;
387 try {
Madan Jampani74da78b2016-02-09 21:18:36 -0800388 MapTransaction<String, byte[]> transaction = commit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800389 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800390 String key = update.key();
391 if (preparedKeys.contains(key)) {
392 return PrepareResult.CONCURRENT_TRANSACTION;
393 }
394 MapEntryValue existingValue = mapEntries.get(key);
395 if (existingValue == null) {
396 if (update.currentValue() != null) {
397 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
398 }
399 } else {
400 if (existingValue.version() != update.currentVersion()) {
401 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
402 }
403 }
404 }
405 // No violations detected. Add to pendingTranctions and mark
406 // modified keys as
407 // currently locked to updates.
Madan Jampani74da78b2016-02-09 21:18:36 -0800408 pendingTransactions.put(transaction.transactionId(), commit);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800409 transaction.updates().forEach(u -> preparedKeys.add(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800410 ok = true;
411 return PrepareResult.OK;
412 } finally {
413 if (!ok) {
414 commit.close();
415 }
416 }
417 }
418
419 /**
420 * Handles an commit commit (ha!).
421 *
422 * @param commit transaction commit commit
423 * @return commit result
424 */
425 protected CommitResult commit(
426 Commit<? extends AtomixConsistentMapCommands.TransactionCommit> commit) {
427 TransactionId transactionId = commit.operation().transactionId();
428 try {
429 Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
430 .remove(transactionId);
431 if (prepareCommit == null) {
432 return CommitResult.UNKNOWN_TRANSACTION_ID;
433 }
Madan Jampani74da78b2016-02-09 21:18:36 -0800434 MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800435 long totalReferencesToCommit = transaction
436 .updates()
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800437 .stream()
438 .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
439 .count();
440 CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
441 new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800442 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800443 String key = update.key();
444 MapEntryValue previousValue = mapEntries.remove(key);
445 MapEntryValue newValue = null;
446 checkState(preparedKeys.remove(key), "key is not prepared");
447 if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
448 newValue = new TransactionalCommit(key,
449 versionCounter.incrementAndGet(), completer);
450 }
451 mapEntries.put(key, newValue);
452 // Notify map listeners
453 notify(new MapEvent<>("", key, toVersioned(newValue),
454 toVersioned(previousValue)));
455 }
456 return CommitResult.OK;
457 } finally {
458 commit.close();
459 }
460 }
461
462 /**
463 * Handles an rollback commit (ha!).
464 *
465 * @param commit transaction rollback commit
466 * @return rollback result
467 */
468 protected RollbackResult rollback(
469 Commit<? extends AtomixConsistentMapCommands.TransactionRollback> commit) {
470 TransactionId transactionId = commit.operation().transactionId();
471 try {
472 Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
473 .remove(transactionId);
474 if (prepareCommit == null) {
475 return RollbackResult.UNKNOWN_TRANSACTION_ID;
476 } else {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800477 prepareCommit.operation()
478 .transaction()
479 .updates()
480 .forEach(u -> preparedKeys.remove(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800481 prepareCommit.close();
482 return RollbackResult.OK;
483 }
484 } finally {
485 commit.close();
486 }
487 }
488
489 private MapEntryUpdateResult.Status validate(
490 AtomixConsistentMapCommands.UpdateAndGet update) {
491 MapEntryValue existingValue = mapEntries.get(update.key());
492 if (existingValue == null && update.value() == null) {
493 return MapEntryUpdateResult.Status.NOOP;
494 }
495 if (preparedKeys.contains(update.key())) {
496 return MapEntryUpdateResult.Status.WRITE_LOCK;
497 }
498 byte[] existingRawValue = existingValue == null ? null : existingValue
499 .value();
500 Long existingVersion = existingValue == null ? null : existingValue
501 .version();
502 return update.valueMatch().matches(existingRawValue)
503 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
504 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
505 }
506
507 private Versioned<byte[]> toVersioned(MapEntryValue value) {
508 return value == null ? null : new Versioned<>(value.value(),
509 value.version());
510 }
511
512 @Override
513 public void register(Session session) {
514 }
515
516 @Override
517 public void unregister(Session session) {
518 closeListener(session.id());
519 }
520
521 @Override
522 public void expire(Session session) {
523 closeListener(session.id());
524 }
525
526 @Override
527 public void close(Session session) {
528 closeListener(session.id());
529 }
530
531 private void closeListener(Long sessionId) {
532 Commit<? extends AtomixConsistentMapCommands.Listen> commit = listeners
533 .remove(sessionId);
534 if (commit != null) {
535 commit.close();
536 }
537 }
538
539 /**
540 * Interface implemented by map values.
541 */
542 private interface MapEntryValue {
543 /**
544 * Returns the raw {@code byte[]}.
545 *
546 * @return raw value
547 */
548 byte[] value();
549
550 /**
551 * Returns the version of the value.
552 *
553 * @return version
554 */
555 long version();
556
557 /**
558 * Discards the value by invoke appropriate clean up actions.
559 */
560 void discard();
561 }
562
563 /**
564 * A {@code MapEntryValue} that is derived from a non-transactional update
565 * i.e. via any standard map update operation.
566 */
567 private class NonTransactionalCommit implements MapEntryValue {
568 private final long version;
569 private final Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit;
570
571 public NonTransactionalCommit(
572 long version,
573 Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
574 this.version = version;
575 this.commit = commit;
576 }
577
578 @Override
579 public byte[] value() {
580 return commit.operation().value();
581 }
582
583 @Override
584 public long version() {
585 return version;
586 }
587
588 @Override
589 public void discard() {
590 commit.close();
591 }
592 }
593
594 /**
595 * A {@code MapEntryValue} that is derived from updates submitted via a
596 * transaction.
597 */
598 private class TransactionalCommit implements MapEntryValue {
599 private final String key;
600 private final long version;
601 private final CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer;
602
603 public TransactionalCommit(
604 String key,
605 long version,
606 CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> commit) {
607 this.key = key;
608 this.version = version;
609 this.completer = commit;
610 }
611
612 @Override
613 public byte[] value() {
Madan Jampani74da78b2016-02-09 21:18:36 -0800614 MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800615 return valueForKey(key, transaction);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800616 }
617
618 @Override
619 public long version() {
620 return version;
621 }
622
623 @Override
624 public void discard() {
625 completer.countDown();
626 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800627
Madan Jampani74da78b2016-02-09 21:18:36 -0800628 private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800629 MapUpdate<String, byte[]> update = transaction.updates()
630 .stream()
631 .filter(u -> u.key().equals(key))
632 .findFirst()
633 .orElse(null);
634 return update == null ? null : update.value();
635 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800636 }
637}