Updates to ConsistentMap and LeaderElector state machines
Change-Id: I7734b253a56fef7300a8a094a3cfc8c1b45c2453
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index e580fed..72e52c2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -30,6 +30,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,11 +40,26 @@
import org.onlab.util.Match;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -52,13 +68,11 @@
/**
* State Machine for {@link AtomixConsistentMap} resource.
*/
-public class AtomixConsistentMapState extends ResourceStateMachine implements
- SessionListener, Snapshottable {
+public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
- private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps
- .newHashMap();
+ private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
private AtomicLong versionCounter = new AtomicLong(0);
@Override
@@ -74,36 +88,23 @@
@Override
protected void configure(StateMachineExecutor executor) {
// Listeners
- executor.register(AtomixConsistentMapCommands.Listen.class,
- this::listen);
- executor.register(AtomixConsistentMapCommands.Unlisten.class,
- this::unlisten);
+ executor.register(Listen.class, this::listen);
+ executor.register(Unlisten.class, this::unlisten);
// Queries
- executor.register(AtomixConsistentMapCommands.ContainsKey.class,
- this::containsKey);
- executor.register(AtomixConsistentMapCommands.ContainsValue.class,
- this::containsValue);
- executor.register(AtomixConsistentMapCommands.EntrySet.class,
- this::entrySet);
- executor.register(AtomixConsistentMapCommands.Get.class, this::get);
- executor.register(AtomixConsistentMapCommands.IsEmpty.class,
- this::isEmpty);
- executor.register(AtomixConsistentMapCommands.KeySet.class,
- this::keySet);
- executor.register(AtomixConsistentMapCommands.Size.class, this::size);
- executor.register(AtomixConsistentMapCommands.Values.class,
- this::values);
+ executor.register(ContainsKey.class, this::containsKey);
+ executor.register(ContainsValue.class, this::containsValue);
+ executor.register(EntrySet.class, this::entrySet);
+ executor.register(Get.class, this::get);
+ executor.register(IsEmpty.class, this::isEmpty);
+ executor.register(KeySet.class, this::keySet);
+ executor.register(Size.class, this::size);
+ executor.register(Values.class, this::values);
// Commands
- executor.register(AtomixConsistentMapCommands.UpdateAndGet.class,
- this::updateAndGet);
+ executor.register(UpdateAndGet.class, this::updateAndGet);
executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
- executor.register(AtomixConsistentMapCommands.TransactionPrepare.class,
- this::prepare);
- executor.register(AtomixConsistentMapCommands.TransactionCommit.class,
- this::commit);
- executor.register(
- AtomixConsistentMapCommands.TransactionRollback.class,
- this::rollback);
+ executor.register(TransactionPrepare.class, this::prepare);
+ executor.register(TransactionCommit.class, this::commit);
+ executor.register(TransactionRollback.class, this::rollback);
}
@Override
@@ -120,12 +121,10 @@
/**
* Handles a contains key commit.
*
- * @param commit
- * containsKey commit
+ * @param commit containsKey commit
* @return {@code true} if map contains key
*/
- protected boolean containsKey(
- Commit<? extends AtomixConsistentMapCommands.ContainsKey> commit) {
+ protected boolean containsKey(Commit<? extends ContainsKey> commit) {
try {
return toVersioned(mapEntries.get(commit.operation().key())) != null;
} finally {
@@ -136,12 +135,10 @@
/**
* Handles a contains value commit.
*
- * @param commit
- * containsValue commit
+ * @param commit containsValue commit
* @return {@code true} if map contains value
*/
- protected boolean containsValue(
- Commit<? extends AtomixConsistentMapCommands.ContainsValue> commit) {
+ protected boolean containsValue(Commit<? extends ContainsValue> commit) {
try {
Match<byte[]> valueMatch = Match
.ifValue(commit.operation().value());
@@ -159,8 +156,7 @@
* get commit
* @return value mapped to key
*/
- protected Versioned<byte[]> get(
- Commit<? extends AtomixConsistentMapCommands.Get> commit) {
+ protected Versioned<byte[]> get(Commit<? extends Get> commit) {
try {
return toVersioned(mapEntries.get(commit.operation().key()));
} finally {
@@ -171,11 +167,10 @@
/**
* Handles a count commit.
*
- * @param commit
- * size commit
+ * @param commit size commit
* @return number of entries in map
*/
- protected int size(Commit<? extends AtomixConsistentMapCommands.Size> commit) {
+ protected int size(Commit<? extends Size> commit) {
try {
return mapEntries.size();
} finally {
@@ -186,12 +181,10 @@
/**
* Handles an is empty commit.
*
- * @param commit
- * isEmpty commit
+ * @param commit isEmpty commit
* @return {@code true} if map is empty
*/
- protected boolean isEmpty(
- Commit<? extends AtomixConsistentMapCommands.IsEmpty> commit) {
+ protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
try {
return mapEntries.isEmpty();
} finally {
@@ -202,14 +195,12 @@
/**
* Handles a keySet commit.
*
- * @param commit
- * keySet commit
+ * @param commit keySet commit
* @return set of keys in map
*/
- protected Set<String> keySet(
- Commit<? extends AtomixConsistentMapCommands.KeySet> commit) {
+ protected Set<String> keySet(Commit<? extends KeySet> commit) {
try {
- return mapEntries.keySet();
+ return mapEntries.keySet().stream().collect(Collectors.toSet());
} finally {
commit.close();
}
@@ -218,15 +209,12 @@
/**
* Handles a values commit.
*
- * @param commit
- * values commit
+ * @param commit values commit
* @return collection of values in map
*/
- protected Collection<Versioned<byte[]>> values(
- Commit<? extends AtomixConsistentMapCommands.Values> commit) {
+ protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
try {
- return mapEntries.values().stream().map(this::toVersioned)
- .collect(Collectors.toList());
+ return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
} finally {
commit.close();
}
@@ -239,8 +227,7 @@
* entrySet commit
* @return set of map entries
*/
- protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
- Commit<? extends AtomixConsistentMapCommands.EntrySet> commit) {
+ protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
try {
return mapEntries
.entrySet()
@@ -256,12 +243,10 @@
/**
* Handles a update and get commit.
*
- * @param commit
- * updateAndGet commit
+ * @param commit updateAndGet commit
* @return update result
*/
- protected MapEntryUpdateResult<String, byte[]> updateAndGet(
- Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
+ protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
String key = commit.operation().key();
MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
@@ -286,8 +271,10 @@
}
if (updateType == INSERT || updateType == UPDATE) {
mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
+ } else {
+ commit.close();
}
- notify(new MapEvent<>("", key, newMapValue, oldMapValue));
+ publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
newMapValue);
}
@@ -295,12 +282,10 @@
/**
* Handles a clear commit.
*
- * @param commit
- * clear commit
+ * @param commit clear commit
* @return clear result
*/
- protected MapEntryUpdateResult.Status clear(
- Commit<? extends AtomixConsistentMapCommands.Clear> commit) {
+ protected MapEntryUpdateResult.Status clear(Commit<? extends Clear> commit) {
try {
Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
.entrySet().iterator();
@@ -310,7 +295,7 @@
MapEntryValue value = entry.getValue();
Versioned<byte[]> removedValue = new Versioned<>(value.value(),
value.version());
- notify(new MapEvent<>("", key, null, removedValue));
+ publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
value.discard();
iterator.remove();
}
@@ -323,11 +308,9 @@
/**
* Handles a listen commit.
*
- * @param commit
- * listen commit
+ * @param commit listen commit
*/
- protected void listen(
- Commit<? extends AtomixConsistentMapCommands.Listen> commit) {
+ protected void listen(Commit<? extends Listen> commit) {
Long sessionId = commit.session().id();
listeners.put(sessionId, commit);
commit.session()
@@ -335,8 +318,7 @@
state -> {
if (state == Session.State.CLOSED
|| state == Session.State.EXPIRED) {
- Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
- .remove(sessionId);
+ Commit<? extends Listen> listener = listeners.remove(sessionId);
if (listener != null) {
listener.close();
}
@@ -347,14 +329,12 @@
/**
* Handles an unlisten commit.
*
- * @param commit
- * unlisten commit
+ * @param commit unlisten commit
*/
protected void unlisten(
- Commit<? extends AtomixConsistentMapCommands.Unlisten> commit) {
+ Commit<? extends Unlisten> commit) {
try {
- Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
- .remove(commit.session());
+ Commit<? extends Listen> listener = listeners.remove(commit.session());
if (listener != null) {
listener.close();
}
@@ -364,25 +344,12 @@
}
/**
- * Triggers a change event.
- *
- * @param value
- * map event
- */
- private void notify(MapEvent<String, byte[]> value) {
- listeners.values().forEach(
- commit -> commit.session().publish("change", value));
- }
-
- /**
* Handles an prepare commit.
*
- * @param commit
- * transaction prepare commit
+ * @param commit transaction prepare commit
* @return prepare result
*/
- protected PrepareResult prepare(
- Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
+ protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
boolean ok = false;
try {
MapTransaction<String, byte[]> transaction = commit.operation().transaction();
@@ -403,8 +370,7 @@
}
}
// No violations detected. Add to pendingTranctions and mark
- // modified keys as
- // currently locked to updates.
+ // modified keys as locked for updates.
pendingTransactions.put(transaction.transactionId(), commit);
transaction.updates().forEach(u -> preparedKeys.add(u.key()));
ok = true;
@@ -422,11 +388,10 @@
* @param commit transaction commit commit
* @return commit result
*/
- protected CommitResult commit(
- Commit<? extends AtomixConsistentMapCommands.TransactionCommit> commit) {
+ protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
TransactionId transactionId = commit.operation().transactionId();
try {
- Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
+ Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
.remove(transactionId);
if (prepareCommit == null) {
return CommitResult.UNKNOWN_TRANSACTION_ID;
@@ -437,8 +402,9 @@
.stream()
.filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.count();
- CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
+ CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
+ List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
MapEntryValue previousValue = mapEntries.remove(key);
@@ -448,11 +414,15 @@
newValue = new TransactionalCommit(key,
versionCounter.incrementAndGet(), completer);
}
- mapEntries.put(key, newValue);
- // Notify map listeners
- notify(new MapEvent<>("", key, toVersioned(newValue),
- toVersioned(previousValue)));
+ eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
+ if (newValue != null) {
+ mapEntries.put(key, newValue);
+ }
+ if (previousValue != null) {
+ previousValue.discard();
+ }
}
+ publish(eventsToPublish);
return CommitResult.OK;
} finally {
commit.close();
@@ -465,12 +435,10 @@
* @param commit transaction rollback commit
* @return rollback result
*/
- protected RollbackResult rollback(
- Commit<? extends AtomixConsistentMapCommands.TransactionRollback> commit) {
+ protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
TransactionId transactionId = commit.operation().transactionId();
try {
- Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
- .remove(transactionId);
+ Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
if (prepareCommit == null) {
return RollbackResult.UNKNOWN_TRANSACTION_ID;
} else {
@@ -486,8 +454,14 @@
}
}
- private MapEntryUpdateResult.Status validate(
- AtomixConsistentMapCommands.UpdateAndGet update) {
+ /**
+ * Computes the update status that would result if the specified update were to applied to
+ * the state machine.
+ *
+ * @param update update
+ * @return status
+ */
+ private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
MapEntryValue existingValue = mapEntries.get(update.key());
if (existingValue == null && update.value() == null) {
return MapEntryUpdateResult.Status.NOOP;
@@ -504,9 +478,22 @@
: MapEntryUpdateResult.Status.PRECONDITION_FAILED;
}
+ /**
+ * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
+ * @param value map entry value
+ * @return versioned instance
+ */
private Versioned<byte[]> toVersioned(MapEntryValue value) {
- return value == null ? null : new Versioned<>(value.value(),
- value.version());
+ return value == null ? null : new Versioned<>(value.value(), value.version());
+ }
+
+ /**
+ * Publishes events to listeners.
+ *
+ * @param events list of map event to publish
+ */
+ private void publish(List<MapEvent<String, byte[]>> events) {
+ listeners.values().forEach(commit -> commit.session().publish(AtomixConsistentMap.CHANGE_SUBJECT, events));
}
@Override
@@ -529,8 +516,7 @@
}
private void closeListener(Long sessionId) {
- Commit<? extends AtomixConsistentMapCommands.Listen> commit = listeners
- .remove(sessionId);
+ Commit<? extends Listen> commit = listeners.remove(sessionId);
if (commit != null) {
commit.close();
}
@@ -566,11 +552,9 @@
*/
private class NonTransactionalCommit implements MapEntryValue {
private final long version;
- private final Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit;
+ private final Commit<? extends UpdateAndGet> commit;
- public NonTransactionalCommit(
- long version,
- Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
+ public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
this.version = version;
this.commit = commit;
}
@@ -598,12 +582,12 @@
private class TransactionalCommit implements MapEntryValue {
private final String key;
private final long version;
- private final CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer;
+ private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
public TransactionalCommit(
String key,
long version,
- CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> commit) {
+ CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
this.key = key;
this.version = version;
this.completer = commit;