ONOS-2097: Ensure updates made via transactional map result in state change notifications
Change-Id: Iecc1b54d2c4c976278e77dbd825d3e3954c53602
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java
new file mode 100644
index 0000000..4d3e4bc
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java
@@ -0,0 +1,46 @@
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Result of a Transaction commit operation.
+ */
+public final class CommitResponse {
+
+ private boolean success;
+ private List<UpdateResult<String, byte[]>> updates;
+
+ public static CommitResponse success(List<UpdateResult<String, byte[]>> updates) {
+ return new CommitResponse(true, updates);
+ }
+
+ public static CommitResponse failure() {
+ return new CommitResponse(false, Collections.emptyList());
+ }
+
+ private CommitResponse(boolean success, List<UpdateResult<String, byte[]>> updates) {
+ this.success = success;
+ this.updates = ImmutableList.copyOf(updates);
+ }
+
+ public boolean success() {
+ return success;
+ }
+
+ public List<UpdateResult<String, byte[]>> updates() {
+ return updates;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("success", success)
+ .add("udpates", updates)
+ .toString();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 667828e..08317b5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -193,7 +193,7 @@
* @param transaction transaction to commit (after preparation)
* @return A completable future to be completed with the result once complete
*/
- CompletableFuture<Boolean> prepareAndCommit(Transaction transaction);
+ CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction);
/**
* Prepare the specified transaction for commit. A successful prepare implies
@@ -213,7 +213,7 @@
* @param transaction transaction to commit
* @return A completable future to be completed with the result once complete
*/
- CompletableFuture<Boolean> commit(Transaction transaction);
+ CompletableFuture<CommitResponse> commit(Transaction transaction);
/**
* Rollback the specified transaction. A successful rollback implies
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
index b457b78..de73414 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -75,6 +75,7 @@
.register(Result.Status.class)
.register(DefaultTransaction.class)
.register(Transaction.State.class)
+ .register(org.onosproject.store.consistent.impl.CommitResponse.class)
.register(Match.class)
.register(NodeId.class)
.build();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index e2e532a..8b6db1e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -102,13 +102,13 @@
Long counterGet(String counterName);
@Command
- boolean prepareAndCommit(Transaction transaction);
+ CommitResponse prepareAndCommit(Transaction transaction);
@Command
boolean prepare(Transaction transaction);
@Command
- boolean commit(Transaction transaction);
+ CommitResponse commit(Transaction transaction);
@Command
boolean rollback(Transaction transaction);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 710f074..c9311c9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -32,6 +32,7 @@
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
+
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
@@ -47,6 +48,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP;
+import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.TX_COMMIT;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -83,7 +85,6 @@
private static final String REPLACE = "replace";
private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
-
private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
private final Logger log = getLogger(getClass());
@@ -127,6 +128,16 @@
MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
notifyListeners(mapEvent);
}
+ } else if (update.target() == TX_COMMIT) {
+ CommitResponse response = update.output();
+ if (response.success()) {
+ response.updates().forEach(u -> {
+ if (u.mapName().equals(name)) {
+ MapEvent<K, V> mapEvent = u.<K, V>map(this::dK, serializer::decode).toMapEvent();
+ notifyListeners(mapEvent);
+ }
+ });
+ }
}
});
});
@@ -439,4 +450,4 @@
});
}
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index b794f96..ba0b1be 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -174,7 +174,7 @@
}
@Override
- public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+ public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
return checkOpen(() -> proxy.prepareAndCommit(transaction));
}
@@ -184,7 +184,7 @@
}
@Override
- public CompletableFuture<Boolean> commit(Transaction transaction) {
+ public CompletableFuture<CommitResponse> commit(Transaction transaction) {
return checkOpen(() -> proxy.commit(transaction));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index bd9cf69..219b847 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -31,11 +31,10 @@
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
-import org.onosproject.store.service.DatabaseUpdate.Type;
-
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import net.kuujo.copycat.state.Initializer;
@@ -239,11 +238,11 @@
}
@Override
- public boolean prepareAndCommit(Transaction transaction) {
+ public CommitResponse prepareAndCommit(Transaction transaction) {
if (prepare(transaction)) {
return commit(transaction);
}
- return false;
+ return CommitResponse.failure();
}
@Override
@@ -263,9 +262,9 @@
}
@Override
- public boolean commit(Transaction transaction) {
- transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
- return true;
+ public CommitResponse commit(Transaction transaction) {
+ return CommitResponse.success(Lists.transform(transaction.updates(),
+ update -> commitProvisionalUpdate(update, transaction.id())));
}
@Override
@@ -334,32 +333,16 @@
}
}
- private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
String mapName = update.mapName();
String key = update.key();
- Type type = update.type();
Update provisionalUpdate = getLockMap(mapName).get(key);
if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
getLockMap(mapName).remove(key);
} else {
- return;
+ throw new IllegalStateException("Invalid transaction Id");
}
-
- switch (type) {
- case PUT:
- case PUT_IF_ABSENT:
- case PUT_IF_VERSION_MATCH:
- case PUT_IF_VALUE_MATCH:
- mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value());
- break;
- case REMOVE:
- case REMOVE_IF_VERSION_MATCH:
- case REMOVE_IF_VALUE_MATCH:
- mapUpdate(mapName, key, Match.any(), Match.any(), null);
- break;
- default:
- break;
- }
+ return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
}
private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
index b0ab575..b66f424 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
@@ -25,12 +25,13 @@
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.TransactionContext;
-import org.onosproject.store.service.TransactionException;
import org.onosproject.store.service.TransactionalMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
/**
* Default TransactionContext implementation.
@@ -86,24 +87,30 @@
@SuppressWarnings("unchecked")
@Override
public void commit() {
+ // TODO: rework commit implementation to be more intuitive
checkState(isOpen, TX_NOT_OPEN_ERROR);
+ CommitResponse response = null;
try {
List<DatabaseUpdate> updates = Lists.newLinkedList();
- txMaps.values()
- .forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
- // FIXME: Updates made via transactional context currently do not result in notifications. (ONOS-2097)
- database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
- } catch (Exception e) {
- abort();
- throw new TransactionException(e);
+ txMaps.values().forEach(m -> updates.addAll(m.prepareDatabaseUpdates()));
+ Transaction transaction = new DefaultTransaction(transactionId, updates);
+ response = Futures.getUnchecked(database.prepareAndCommit(transaction));
} finally {
+ if (response != null && !response.success()) {
+ abort();
+ }
isOpen = false;
}
}
@Override
public void abort() {
- checkState(isOpen, TX_NOT_OPEN_ERROR);
- txMaps.values().forEach(m -> m.rollback());
+ if (isOpen) {
+ try {
+ txMaps.values().forEach(m -> m.rollback());
+ } finally {
+ isOpen = false;
+ }
+ }
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 6c537c0..09b3f59 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -33,6 +33,7 @@
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -246,10 +247,10 @@
}
@Override
- public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+ public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
if (subTransactions.isEmpty()) {
- return CompletableFuture.completedFuture(true);
+ return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of()));
} else if (subTransactions.size() == 1) {
Entry<Database, Transaction> entry =
subTransactions.entrySet().iterator().next();
@@ -277,13 +278,22 @@
}
@Override
- public CompletableFuture<Boolean> commit(Transaction transaction) {
+ public CompletableFuture<CommitResponse> commit(Transaction transaction) {
Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+ AtomicBoolean success = new AtomicBoolean(true);
+ List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
return CompletableFuture.allOf(subTransactions.entrySet()
- .stream()
- .map(entry -> entry.getKey().commit(entry.getValue()))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> true);
+ .stream()
+ .map(entry -> entry.getKey().commit(entry.getValue())
+ .thenAccept(response -> {
+ success.set(success.get() && response.success());
+ if (success.get()) {
+ allUpdates.addAll(response.updates());
+ }
+ }))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> success.get() ?
+ CommitResponse.success(allUpdates) : CommitResponse.failure());
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
index 1e2a09e..9b064b0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
@@ -32,6 +32,11 @@
MAP,
/**
+ * Update is a transaction commit.
+ */
+ TX_COMMIT,
+
+ /**
* Update is for a non-map data structure.
*/
OTHER
@@ -51,6 +56,8 @@
// FIXME: This check is brittle
if (operationName.contains("mapUpdate")) {
return Target.MAP;
+ } else if (operationName.contains("commit") || operationName.contains("prepareAndCommit")) {
+ return Target.TX_COMMIT;
} else {
return Target.OTHER;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
index db39a46..fc6e58d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
@@ -32,6 +32,8 @@
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.Transaction.State;
+import com.google.common.collect.ImmutableList;
+
/**
* Agent that runs the two phase commit protocol.
*/
@@ -71,15 +73,15 @@
* @return transaction result. Result value true indicates a successful commit, false
* indicates abort
*/
- public CompletableFuture<Boolean> execute(Transaction transaction) {
+ public CompletableFuture<CommitResponse> execute(Transaction transaction) {
// clean up if this transaction in already in a terminal state.
if (transaction.state() == Transaction.State.COMMITTED ||
transaction.state() == Transaction.State.ROLLEDBACK) {
- return transactions.remove(transaction.id()).thenApply(v -> true);
+ return transactions.remove(transaction.id()).thenApply(v -> CommitResponse.success(ImmutableList.of()));
} else if (transaction.state() == Transaction.State.COMMITTING) {
return commit(transaction);
} else if (transaction.state() == Transaction.State.ROLLINGBACK) {
- return rollback(transaction);
+ return rollback(transaction).thenApply(v -> CommitResponse.success(ImmutableList.of()));
} else {
return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
}
@@ -107,19 +109,18 @@
.thenApply(v -> status));
}
- private CompletableFuture<Boolean> commit(Transaction transaction) {
+ private CompletableFuture<CommitResponse> commit(Transaction transaction) {
return database.commit(transaction)
- .thenCompose(v -> transactions.put(
+ .whenComplete((r, e) -> transactions.put(
transaction.id(),
- transaction.transition(Transaction.State.COMMITTED)))
- .thenApply(v -> true);
+ transaction.transition(Transaction.State.COMMITTED)));
}
- private CompletableFuture<Boolean> rollback(Transaction transaction) {
+ private CompletableFuture<CommitResponse> rollback(Transaction transaction) {
return database.rollback(transaction)
.thenCompose(v -> transactions.put(
transaction.id(),
transaction.transition(Transaction.State.ROLLEDBACK)))
- .thenApply(v -> true);
+ .thenApply(v -> CommitResponse.failure());
}
}