blob: 100941f06e94d0e6ab76767aa6e52a17b72b3a41 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.onosproject.store.service.MapEvent.Type.INSERT;
19import static org.onosproject.store.service.MapEvent.Type.REMOVE;
20import static org.onosproject.store.service.MapEvent.Type.UPDATE;
21import io.atomix.copycat.client.session.Session;
22import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.SessionListener;
26import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
27import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
28import io.atomix.resource.ResourceStateMachine;
29
30import java.util.Collection;
31import java.util.HashMap;
32import java.util.Iterator;
33import java.util.Map;
34import java.util.Set;
35import java.util.concurrent.atomic.AtomicLong;
36import java.util.stream.Collectors;
37
38import org.onlab.util.CountDownCompleter;
39import org.onlab.util.Match;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
41import org.onosproject.store.service.MapEvent;
42import org.onosproject.store.service.Versioned;
43
44import com.google.common.collect.Maps;
45import com.google.common.collect.Sets;
46
47import static com.google.common.base.Preconditions.checkState;
48
49/**
50 * State Machine for {@link AtomixConsistentMap} resource.
51 */
52public class AtomixConsistentMapState extends ResourceStateMachine implements
53 SessionListener, Snapshottable {
54 private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
55 private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
56 private final Set<String> preparedKeys = Sets.newHashSet();
57 private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps
58 .newHashMap();
59 private AtomicLong versionCounter = new AtomicLong(0);
60
61 @Override
62 public void snapshot(SnapshotWriter writer) {
63 writer.writeLong(versionCounter.get());
64 }
65
66 @Override
67 public void install(SnapshotReader reader) {
68 versionCounter = new AtomicLong(reader.readLong());
69 }
70
71 @Override
72 protected void configure(StateMachineExecutor executor) {
73 // Listeners
74 executor.register(AtomixConsistentMapCommands.Listen.class,
75 this::listen);
76 executor.register(AtomixConsistentMapCommands.Unlisten.class,
77 this::unlisten);
78 // Queries
79 executor.register(AtomixConsistentMapCommands.ContainsKey.class,
80 this::containsKey);
81 executor.register(AtomixConsistentMapCommands.ContainsValue.class,
82 this::containsValue);
83 executor.register(AtomixConsistentMapCommands.EntrySet.class,
84 this::entrySet);
85 executor.register(AtomixConsistentMapCommands.Get.class, this::get);
86 executor.register(AtomixConsistentMapCommands.IsEmpty.class,
87 this::isEmpty);
88 executor.register(AtomixConsistentMapCommands.KeySet.class,
89 this::keySet);
90 executor.register(AtomixConsistentMapCommands.Size.class, this::size);
91 executor.register(AtomixConsistentMapCommands.Values.class,
92 this::values);
93 // Commands
94 executor.register(AtomixConsistentMapCommands.UpdateAndGet.class,
95 this::updateAndGet);
96 executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
97 executor.register(AtomixConsistentMapCommands.TransactionPrepare.class,
98 this::prepare);
99 executor.register(AtomixConsistentMapCommands.TransactionCommit.class,
100 this::commit);
101 executor.register(
102 AtomixConsistentMapCommands.TransactionRollback.class,
103 this::rollback);
104 }
105
106 @Override
107 public void delete() {
108 // Delete Listeners
109 listeners.values().forEach(Commit::close);
110 listeners.clear();
111
112 // Delete Map entries
113 mapEntries.values().forEach(MapEntryValue::discard);
114 mapEntries.clear();
115 }
116
117 /**
118 * Handles a contains key commit.
119 *
120 * @param commit
121 * containsKey commit
122 * @return {@code true} if map contains key
123 */
124 protected boolean containsKey(
125 Commit<? extends AtomixConsistentMapCommands.ContainsKey> commit) {
126 try {
127 return toVersioned(mapEntries.get(commit.operation().key())) != null;
128 } finally {
129 commit.close();
130 }
131 }
132
133 /**
134 * Handles a contains value commit.
135 *
136 * @param commit
137 * containsValue commit
138 * @return {@code true} if map contains value
139 */
140 protected boolean containsValue(
141 Commit<? extends AtomixConsistentMapCommands.ContainsValue> commit) {
142 try {
143 Match<byte[]> valueMatch = Match
144 .ifValue(commit.operation().value());
145 return mapEntries.values().stream()
146 .anyMatch(value -> valueMatch.matches(value.value()));
147 } finally {
148 commit.close();
149 }
150 }
151
152 /**
153 * Handles a get commit.
154 *
155 * @param commit
156 * get commit
157 * @return value mapped to key
158 */
159 protected Versioned<byte[]> get(
160 Commit<? extends AtomixConsistentMapCommands.Get> commit) {
161 try {
162 return toVersioned(mapEntries.get(commit.operation().key()));
163 } finally {
164 commit.close();
165 }
166 }
167
168 /**
169 * Handles a count commit.
170 *
171 * @param commit
172 * size commit
173 * @return number of entries in map
174 */
175 protected int size(Commit<? extends AtomixConsistentMapCommands.Size> commit) {
176 try {
177 return mapEntries.size();
178 } finally {
179 commit.close();
180 }
181 }
182
183 /**
184 * Handles an is empty commit.
185 *
186 * @param commit
187 * isEmpty commit
188 * @return {@code true} if map is empty
189 */
190 protected boolean isEmpty(
191 Commit<? extends AtomixConsistentMapCommands.IsEmpty> commit) {
192 try {
193 return mapEntries.isEmpty();
194 } finally {
195 commit.close();
196 }
197 }
198
199 /**
200 * Handles a keySet commit.
201 *
202 * @param commit
203 * keySet commit
204 * @return set of keys in map
205 */
206 protected Set<String> keySet(
207 Commit<? extends AtomixConsistentMapCommands.KeySet> commit) {
208 try {
209 return mapEntries.keySet();
210 } finally {
211 commit.close();
212 }
213 }
214
215 /**
216 * Handles a values commit.
217 *
218 * @param commit
219 * values commit
220 * @return collection of values in map
221 */
222 protected Collection<Versioned<byte[]>> values(
223 Commit<? extends AtomixConsistentMapCommands.Values> commit) {
224 try {
225 return mapEntries.values().stream().map(this::toVersioned)
226 .collect(Collectors.toList());
227 } finally {
228 commit.close();
229 }
230 }
231
232 /**
233 * Handles a entry set commit.
234 *
235 * @param commit
236 * entrySet commit
237 * @return set of map entries
238 */
239 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
240 Commit<? extends AtomixConsistentMapCommands.EntrySet> commit) {
241 try {
242 return mapEntries
243 .entrySet()
244 .stream()
245 .map(e -> Maps.immutableEntry(e.getKey(),
246 toVersioned(e.getValue())))
247 .collect(Collectors.toSet());
248 } finally {
249 commit.close();
250 }
251 }
252
253 /**
254 * Handles a update and get commit.
255 *
256 * @param commit
257 * updateAndGet commit
258 * @return update result
259 */
260 protected MapEntryUpdateResult<String, byte[]> updateAndGet(
261 Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
262 MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
263 String key = commit.operation().key();
264 MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
265 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
266
267 if (updateStatus != MapEntryUpdateResult.Status.OK) {
268 commit.close();
269 return new MapEntryUpdateResult<>(updateStatus, "", key,
270 oldMapValue, oldMapValue);
271 }
272
273 byte[] newValue = commit.operation().value();
274 long newVersion = versionCounter.incrementAndGet();
275 Versioned<byte[]> newMapValue = newValue == null ? null
276 : new Versioned<>(newValue, newVersion);
277
278 MapEvent.Type updateType = newValue == null ? REMOVE
279 : oldCommitValue == null ? INSERT : UPDATE;
280 if (updateType == REMOVE || updateType == UPDATE) {
281 mapEntries.remove(key);
282 oldCommitValue.discard();
283 }
284 if (updateType == INSERT || updateType == UPDATE) {
285 mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
286 }
287 notify(new MapEvent<>("", key, newMapValue, oldMapValue));
288 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
289 newMapValue);
290 }
291
292 /**
293 * Handles a clear commit.
294 *
295 * @param commit
296 * clear commit
297 * @return clear result
298 */
299 protected MapEntryUpdateResult.Status clear(
300 Commit<? extends AtomixConsistentMapCommands.Clear> commit) {
301 try {
302 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
303 .entrySet().iterator();
304 while (iterator.hasNext()) {
305 Map.Entry<String, MapEntryValue> entry = iterator.next();
306 String key = entry.getKey();
307 MapEntryValue value = entry.getValue();
308 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
309 value.version());
310 notify(new MapEvent<>("", key, null, removedValue));
311 value.discard();
312 iterator.remove();
313 }
314 return MapEntryUpdateResult.Status.OK;
315 } finally {
316 commit.close();
317 }
318 }
319
320 /**
321 * Handles a listen commit.
322 *
323 * @param commit
324 * listen commit
325 */
326 protected void listen(
327 Commit<? extends AtomixConsistentMapCommands.Listen> commit) {
328 Long sessionId = commit.session().id();
329 listeners.put(sessionId, commit);
330 commit.session()
331 .onStateChange(
332 state -> {
333 if (state == Session.State.CLOSED
334 || state == Session.State.EXPIRED) {
335 Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
336 .remove(sessionId);
337 if (listener != null) {
338 listener.close();
339 }
340 }
341 });
342 }
343
344 /**
345 * Handles an unlisten commit.
346 *
347 * @param commit
348 * unlisten commit
349 */
350 protected void unlisten(
351 Commit<? extends AtomixConsistentMapCommands.Unlisten> commit) {
352 try {
353 Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
354 .remove(commit.session());
355 if (listener != null) {
356 listener.close();
357 }
358 } finally {
359 commit.close();
360 }
361 }
362
363 /**
364 * Triggers a change event.
365 *
366 * @param value
367 * map event
368 */
369 private void notify(MapEvent<String, byte[]> value) {
370 listeners.values().forEach(
371 commit -> commit.session().publish("change", value));
372 }
373
374 /**
375 * Handles an prepare commit.
376 *
377 * @param commit
378 * transaction prepare commit
379 * @return prepare result
380 */
381 protected PrepareResult prepare(
382 Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
383 boolean ok = false;
384 try {
385 TransactionalMapUpdate<String, byte[]> transactionUpdate = commit
386 .operation().transactionUpdate();
387 for (MapUpdate<String, byte[]> update : transactionUpdate.batch()) {
388 String key = update.key();
389 if (preparedKeys.contains(key)) {
390 return PrepareResult.CONCURRENT_TRANSACTION;
391 }
392 MapEntryValue existingValue = mapEntries.get(key);
393 if (existingValue == null) {
394 if (update.currentValue() != null) {
395 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
396 }
397 } else {
398 if (existingValue.version() != update.currentVersion()) {
399 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
400 }
401 }
402 }
403 // No violations detected. Add to pendingTranctions and mark
404 // modified keys as
405 // currently locked to updates.
406 pendingTransactions.put(transactionUpdate.transactionId(), commit);
407 transactionUpdate.batch().forEach(u -> preparedKeys.add(u.key()));
408 ok = true;
409 return PrepareResult.OK;
410 } finally {
411 if (!ok) {
412 commit.close();
413 }
414 }
415 }
416
417 /**
418 * Handles an commit commit (ha!).
419 *
420 * @param commit transaction commit commit
421 * @return commit result
422 */
423 protected CommitResult commit(
424 Commit<? extends AtomixConsistentMapCommands.TransactionCommit> commit) {
425 TransactionId transactionId = commit.operation().transactionId();
426 try {
427 Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
428 .remove(transactionId);
429 if (prepareCommit == null) {
430 return CommitResult.UNKNOWN_TRANSACTION_ID;
431 }
432 TransactionalMapUpdate<String, byte[]> transactionalUpdate = prepareCommit
433 .operation().transactionUpdate();
434 long totalReferencesToCommit = transactionalUpdate
435 .batch()
436 .stream()
437 .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
438 .count();
439 CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
440 new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
441 for (MapUpdate<String, byte[]> update : transactionalUpdate.batch()) {
442 String key = update.key();
443 MapEntryValue previousValue = mapEntries.remove(key);
444 MapEntryValue newValue = null;
445 checkState(preparedKeys.remove(key), "key is not prepared");
446 if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
447 newValue = new TransactionalCommit(key,
448 versionCounter.incrementAndGet(), completer);
449 }
450 mapEntries.put(key, newValue);
451 // Notify map listeners
452 notify(new MapEvent<>("", key, toVersioned(newValue),
453 toVersioned(previousValue)));
454 }
455 return CommitResult.OK;
456 } finally {
457 commit.close();
458 }
459 }
460
461 /**
462 * Handles an rollback commit (ha!).
463 *
464 * @param commit transaction rollback commit
465 * @return rollback result
466 */
467 protected RollbackResult rollback(
468 Commit<? extends AtomixConsistentMapCommands.TransactionRollback> commit) {
469 TransactionId transactionId = commit.operation().transactionId();
470 try {
471 Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
472 .remove(transactionId);
473 if (prepareCommit == null) {
474 return RollbackResult.UNKNOWN_TRANSACTION_ID;
475 } else {
476 prepareCommit.operation().transactionUpdate().batch()
477 .forEach(u -> preparedKeys.remove(u.key()));
478 prepareCommit.close();
479 return RollbackResult.OK;
480 }
481 } finally {
482 commit.close();
483 }
484 }
485
486 private MapEntryUpdateResult.Status validate(
487 AtomixConsistentMapCommands.UpdateAndGet update) {
488 MapEntryValue existingValue = mapEntries.get(update.key());
489 if (existingValue == null && update.value() == null) {
490 return MapEntryUpdateResult.Status.NOOP;
491 }
492 if (preparedKeys.contains(update.key())) {
493 return MapEntryUpdateResult.Status.WRITE_LOCK;
494 }
495 byte[] existingRawValue = existingValue == null ? null : existingValue
496 .value();
497 Long existingVersion = existingValue == null ? null : existingValue
498 .version();
499 return update.valueMatch().matches(existingRawValue)
500 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
501 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
502 }
503
504 private Versioned<byte[]> toVersioned(MapEntryValue value) {
505 return value == null ? null : new Versioned<>(value.value(),
506 value.version());
507 }
508
509 @Override
510 public void register(Session session) {
511 }
512
513 @Override
514 public void unregister(Session session) {
515 closeListener(session.id());
516 }
517
518 @Override
519 public void expire(Session session) {
520 closeListener(session.id());
521 }
522
523 @Override
524 public void close(Session session) {
525 closeListener(session.id());
526 }
527
528 private void closeListener(Long sessionId) {
529 Commit<? extends AtomixConsistentMapCommands.Listen> commit = listeners
530 .remove(sessionId);
531 if (commit != null) {
532 commit.close();
533 }
534 }
535
536 /**
537 * Interface implemented by map values.
538 */
539 private interface MapEntryValue {
540 /**
541 * Returns the raw {@code byte[]}.
542 *
543 * @return raw value
544 */
545 byte[] value();
546
547 /**
548 * Returns the version of the value.
549 *
550 * @return version
551 */
552 long version();
553
554 /**
555 * Discards the value by invoke appropriate clean up actions.
556 */
557 void discard();
558 }
559
560 /**
561 * A {@code MapEntryValue} that is derived from a non-transactional update
562 * i.e. via any standard map update operation.
563 */
564 private class NonTransactionalCommit implements MapEntryValue {
565 private final long version;
566 private final Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit;
567
568 public NonTransactionalCommit(
569 long version,
570 Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
571 this.version = version;
572 this.commit = commit;
573 }
574
575 @Override
576 public byte[] value() {
577 return commit.operation().value();
578 }
579
580 @Override
581 public long version() {
582 return version;
583 }
584
585 @Override
586 public void discard() {
587 commit.close();
588 }
589 }
590
591 /**
592 * A {@code MapEntryValue} that is derived from updates submitted via a
593 * transaction.
594 */
595 private class TransactionalCommit implements MapEntryValue {
596 private final String key;
597 private final long version;
598 private final CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer;
599
600 public TransactionalCommit(
601 String key,
602 long version,
603 CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> commit) {
604 this.key = key;
605 this.version = version;
606 this.completer = commit;
607 }
608
609 @Override
610 public byte[] value() {
611 TransactionalMapUpdate<String, byte[]> update = completer.object()
612 .operation().transactionUpdate();
613 return update.valueForKey(key);
614 }
615
616 @Override
617 public long version() {
618 return version;
619 }
620
621 @Override
622 public void discard() {
623 completer.countDown();
624 }
625 }
626}