ONOS-2097: Ensure updates made via transactional map result in state change notifications

Change-Id: Iecc1b54d2c4c976278e77dbd825d3e3954c53602
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java
new file mode 100644
index 0000000..4d3e4bc
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java
@@ -0,0 +1,46 @@
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Result of a Transaction commit operation.
+ */
+public final class CommitResponse {
+
+    private boolean success;
+    private List<UpdateResult<String, byte[]>> updates;
+
+    public static CommitResponse success(List<UpdateResult<String, byte[]>> updates) {
+        return new CommitResponse(true, updates);
+    }
+
+    public static CommitResponse failure() {
+        return new CommitResponse(false, Collections.emptyList());
+    }
+
+    private CommitResponse(boolean success, List<UpdateResult<String, byte[]>> updates) {
+        this.success = success;
+        this.updates = ImmutableList.copyOf(updates);
+    }
+
+    public boolean success() {
+        return success;
+    }
+
+    public List<UpdateResult<String, byte[]>> updates() {
+        return updates;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+                .add("success", success)
+                .add("udpates", updates)
+                .toString();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 667828e..08317b5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -193,7 +193,7 @@
      * @param transaction transaction to commit (after preparation)
      * @return A completable future to be completed with the result once complete
      */
-    CompletableFuture<Boolean> prepareAndCommit(Transaction transaction);
+    CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction);
 
     /**
      * Prepare the specified transaction for commit. A successful prepare implies
@@ -213,7 +213,7 @@
      * @param transaction transaction to commit
      * @return A completable future to be completed with the result once complete
      */
-    CompletableFuture<Boolean> commit(Transaction transaction);
+    CompletableFuture<CommitResponse> commit(Transaction transaction);
 
     /**
      * Rollback the specified transaction. A successful rollback implies
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
index b457b78..de73414 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -75,6 +75,7 @@
             .register(Result.Status.class)
             .register(DefaultTransaction.class)
             .register(Transaction.State.class)
+            .register(org.onosproject.store.consistent.impl.CommitResponse.class)
             .register(Match.class)
             .register(NodeId.class)
             .build();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index e2e532a..8b6db1e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -102,13 +102,13 @@
   Long counterGet(String counterName);
 
   @Command
-  boolean prepareAndCommit(Transaction transaction);
+  CommitResponse prepareAndCommit(Transaction transaction);
 
   @Command
   boolean prepare(Transaction transaction);
 
   @Command
-  boolean commit(Transaction transaction);
+  CommitResponse commit(Transaction transaction);
 
   @Command
   boolean rollback(Transaction transaction);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 710f074..c9311c9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -32,6 +32,7 @@
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -47,6 +48,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP;
+import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.TX_COMMIT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -83,7 +85,6 @@
     private static final String REPLACE = "replace";
     private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
 
-
     private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
 
     private final Logger log = getLogger(getClass());
@@ -127,6 +128,16 @@
                         MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
                         notifyListeners(mapEvent);
                     }
+                } else if (update.target() == TX_COMMIT) {
+                    CommitResponse response = update.output();
+                    if (response.success()) {
+                        response.updates().forEach(u -> {
+                            if (u.mapName().equals(name)) {
+                                MapEvent<K, V> mapEvent = u.<K, V>map(this::dK, serializer::decode).toMapEvent();
+                                notifyListeners(mapEvent);
+                            }
+                        });
+                    }
                 }
             });
         });
@@ -439,4 +450,4 @@
         });
     }
 
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index b794f96..ba0b1be 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -174,7 +174,7 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+    public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
         return checkOpen(() -> proxy.prepareAndCommit(transaction));
     }
 
@@ -184,7 +184,7 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> commit(Transaction transaction) {
+    public CompletableFuture<CommitResponse> commit(Transaction transaction) {
         return checkOpen(() -> proxy.commit(transaction));
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index bd9cf69..219b847 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -31,11 +31,10 @@
 import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
-import org.onosproject.store.service.DatabaseUpdate.Type;
-
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import net.kuujo.copycat.state.Initializer;
@@ -239,11 +238,11 @@
     }
 
     @Override
-    public boolean prepareAndCommit(Transaction transaction) {
+    public CommitResponse prepareAndCommit(Transaction transaction) {
         if (prepare(transaction)) {
             return commit(transaction);
         }
-        return false;
+        return CommitResponse.failure();
     }
 
     @Override
@@ -263,9 +262,9 @@
     }
 
     @Override
-    public boolean commit(Transaction transaction) {
-        transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
-        return true;
+    public CommitResponse commit(Transaction transaction) {
+        return CommitResponse.success(Lists.transform(transaction.updates(),
+                                                      update -> commitProvisionalUpdate(update, transaction.id())));
     }
 
     @Override
@@ -334,32 +333,16 @@
         }
     }
 
-    private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+    private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
         String mapName = update.mapName();
         String key = update.key();
-        Type type = update.type();
         Update provisionalUpdate = getLockMap(mapName).get(key);
         if (Objects.equal(transactionId, provisionalUpdate.transactionId()))  {
             getLockMap(mapName).remove(key);
         } else {
-            return;
+            throw new IllegalStateException("Invalid transaction Id");
         }
-
-        switch (type) {
-        case PUT:
-        case PUT_IF_ABSENT:
-        case PUT_IF_VERSION_MATCH:
-        case PUT_IF_VALUE_MATCH:
-            mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value());
-            break;
-        case REMOVE:
-        case REMOVE_IF_VERSION_MATCH:
-        case REMOVE_IF_VALUE_MATCH:
-            mapUpdate(mapName, key, Match.any(), Match.any(), null);
-            break;
-        default:
-            break;
-        }
+        return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
     }
 
     private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
index b0ab575..b66f424 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
@@ -25,12 +25,13 @@
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.TransactionContext;
-import org.onosproject.store.service.TransactionException;
 import org.onosproject.store.service.TransactionalMap;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
 
 /**
  * Default TransactionContext implementation.
@@ -86,24 +87,30 @@
     @SuppressWarnings("unchecked")
     @Override
     public void commit() {
+        // TODO: rework commit implementation to be more intuitive
         checkState(isOpen, TX_NOT_OPEN_ERROR);
+        CommitResponse response = null;
         try {
             List<DatabaseUpdate> updates = Lists.newLinkedList();
-            txMaps.values()
-                  .forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
-            // FIXME: Updates made via transactional context currently do not result in notifications. (ONOS-2097)
-            database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
-        } catch (Exception e) {
-            abort();
-            throw new TransactionException(e);
+            txMaps.values().forEach(m -> updates.addAll(m.prepareDatabaseUpdates()));
+            Transaction transaction = new DefaultTransaction(transactionId, updates);
+            response = Futures.getUnchecked(database.prepareAndCommit(transaction));
         } finally {
+            if (response != null && !response.success()) {
+                abort();
+            }
             isOpen = false;
         }
     }
 
     @Override
     public void abort() {
-        checkState(isOpen, TX_NOT_OPEN_ERROR);
-        txMaps.values().forEach(m -> m.rollback());
+        if (isOpen) {
+            try {
+                txMaps.values().forEach(m -> m.rollback());
+            } finally {
+                isOpen = false;
+            }
+        }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 6c537c0..09b3f59 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -33,6 +33,7 @@
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -246,10 +247,10 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+    public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
         Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
         if (subTransactions.isEmpty()) {
-            return CompletableFuture.completedFuture(true);
+            return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of()));
         } else if (subTransactions.size() == 1) {
             Entry<Database, Transaction> entry =
                     subTransactions.entrySet().iterator().next();
@@ -277,13 +278,22 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> commit(Transaction transaction) {
+    public CompletableFuture<CommitResponse> commit(Transaction transaction) {
         Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        AtomicBoolean success = new AtomicBoolean(true);
+        List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
         return CompletableFuture.allOf(subTransactions.entrySet()
-                .stream()
-                .map(entry -> entry.getKey().commit(entry.getValue()))
-                .toArray(CompletableFuture[]::new))
-        .thenApply(v -> true);
+                                   .stream()
+                                   .map(entry -> entry.getKey().commit(entry.getValue())
+                                                           .thenAccept(response -> {
+                                                               success.set(success.get() && response.success());
+                                                               if (success.get()) {
+                                                                   allUpdates.addAll(response.updates());
+                                                               }
+                                                           }))
+                                   .toArray(CompletableFuture[]::new))
+                               .thenApply(v -> success.get() ?
+                                       CommitResponse.success(allUpdates) : CommitResponse.failure());
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
index 1e2a09e..9b064b0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
@@ -32,6 +32,11 @@
         MAP,
 
         /**
+         * Update is a transaction commit.
+         */
+        TX_COMMIT,
+
+        /**
          * Update is for a non-map data structure.
          */
         OTHER
@@ -51,6 +56,8 @@
         // FIXME: This check is brittle
         if (operationName.contains("mapUpdate")) {
             return Target.MAP;
+        } else if (operationName.contains("commit") || operationName.contains("prepareAndCommit")) {
+            return Target.TX_COMMIT;
         } else {
             return Target.OTHER;
         }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
index db39a46..fc6e58d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
@@ -32,6 +32,8 @@
 import org.onosproject.store.service.Versioned;
 import org.onosproject.store.service.Transaction.State;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * Agent that runs the two phase commit protocol.
  */
@@ -71,15 +73,15 @@
      * @return transaction result. Result value true indicates a successful commit, false
      * indicates abort
      */
-    public CompletableFuture<Boolean> execute(Transaction transaction) {
+    public CompletableFuture<CommitResponse> execute(Transaction transaction) {
         // clean up if this transaction in already in a terminal state.
         if (transaction.state() == Transaction.State.COMMITTED ||
                 transaction.state() == Transaction.State.ROLLEDBACK) {
-            return transactions.remove(transaction.id()).thenApply(v -> true);
+            return transactions.remove(transaction.id()).thenApply(v -> CommitResponse.success(ImmutableList.of()));
         } else if (transaction.state() == Transaction.State.COMMITTING) {
             return commit(transaction);
         } else if (transaction.state() == Transaction.State.ROLLINGBACK) {
-            return rollback(transaction);
+            return rollback(transaction).thenApply(v -> CommitResponse.success(ImmutableList.of()));
         } else {
             return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
         }
@@ -107,19 +109,18 @@
                 .thenApply(v -> status));
     }
 
-    private CompletableFuture<Boolean> commit(Transaction transaction) {
+    private CompletableFuture<CommitResponse> commit(Transaction transaction) {
         return database.commit(transaction)
-                .thenCompose(v -> transactions.put(
+                .whenComplete((r, e) -> transactions.put(
                             transaction.id(),
-                            transaction.transition(Transaction.State.COMMITTED)))
-                .thenApply(v -> true);
+                            transaction.transition(Transaction.State.COMMITTED)));
     }
 
-    private CompletableFuture<Boolean> rollback(Transaction transaction) {
+    private CompletableFuture<CommitResponse> rollback(Transaction transaction) {
         return database.rollback(transaction)
                 .thenCompose(v -> transactions.put(
                             transaction.id(),
                             transaction.transition(Transaction.State.ROLLEDBACK)))
-                .thenApply(v -> true);
+                .thenApply(v -> CommitResponse.failure());
     }
 }