ONOS-4218: Fixes for resource store transaction failures

Change-Id: Ie48bb04d7daf6ed7b63c33a3c3c2703496179aa6
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index 43cacb3..30a6f26 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -27,6 +27,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.CommitResult;
+import org.onosproject.store.service.CommitStatus;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContext;
@@ -96,22 +97,23 @@
 
     @SuppressWarnings("unchecked")
     @Override
-    public boolean commit() {
+    public CompletableFuture<CommitStatus> commit() {
         // TODO: rework commit implementation to be more intuitive
         checkState(isOpen, TX_NOT_OPEN_ERROR);
-        CommitResult result = null;
+        CommitStatus status;
         try {
             List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
             txMaps.values().forEach(m -> updates.addAll(m.toMapUpdates()));
             Transaction transaction = new Transaction(transactionId, updates);
-            result = Futures.getUnchecked(transactionCommitter.apply(transaction));
-            return result == CommitResult.OK;
+            status = Futures.getUnchecked(transactionCommitter.apply(transaction)) == CommitResult.OK
+                    ? CommitStatus.SUCCESS : CommitStatus.FAILURE;
         } catch (Exception e) {
             abort();
-            return false;
+            status = CommitStatus.FAILURE;
         } finally {
             isOpen = false;
         }
+        return CompletableFuture.completedFuture(status);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
index 39d24cf..e9e75a5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
@@ -16,13 +16,16 @@
 package org.onosproject.store.primitives.impl;
 
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.CommitStatus;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContext;
 import org.onosproject.store.service.TransactionalMap;
+import org.onosproject.utils.MeteringAgent;
 
 import com.google.common.collect.Sets;
 
@@ -36,6 +39,7 @@
     private final TransactionId transactionId;
     private final TransactionCoordinator transactionCoordinator;
     private final Set<TransactionParticipant> txParticipants = Sets.newConcurrentHashSet();
+    private final MeteringAgent monitor;
 
     public NewDefaultTransactionContext(TransactionId transactionId,
             DistributedPrimitiveCreator creator,
@@ -43,6 +47,7 @@
         this.transactionId = transactionId;
         this.creator = creator;
         this.transactionCoordinator = transactionCoordinator;
+        this.monitor = new MeteringAgent("transactionContext", "*", true);
     }
 
     @Override
@@ -68,9 +73,10 @@
     }
 
     @Override
-    public boolean commit() {
-        transactionCoordinator.commit(transactionId, txParticipants).getNow(null);
-        return true;
+    public CompletableFuture<CommitStatus> commit() {
+        final MeteringAgent.Context timer = monitor.startTimer("commit");
+        return transactionCoordinator.commit(transactionId, txParticipants)
+                .whenComplete((r, e) -> timer.stop(e));
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
index 908a35d..1904894 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
@@ -22,6 +22,7 @@
 import org.onlab.util.Tools;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.CommitStatus;
 
 /**
  * Coordinator for a two-phase commit protocol.
@@ -37,45 +38,47 @@
     /**
      * Commits a transaction.
      *
-     * @param transactionId           transaction
+     * @param transactionId transaction identifier
      * @param transactionParticipants set of transaction participants
      * @return future for commit result
      */
-    CompletableFuture<Void> commit(TransactionId transactionId, Set<TransactionParticipant> transactionParticipants) {
+    CompletableFuture<CommitStatus> commit(TransactionId transactionId,
+                                           Set<TransactionParticipant> transactionParticipants) {
         if (!transactionParticipants.stream().anyMatch(t -> t.hasPendingUpdates())) {
-            return CompletableFuture.completedFuture(null);
+            return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
         }
 
-       return  transactions.put(transactionId, Transaction.State.PREPARING)
+        CompletableFuture<CommitStatus> status =  transactions.put(transactionId, Transaction.State.PREPARING)
                     .thenCompose(v -> this.doPrepare(transactionParticipants))
                     .thenCompose(result -> result
                            ? transactions.put(transactionId, Transaction.State.COMMITTING)
                                          .thenCompose(v -> doCommit(transactionParticipants))
-                                         .thenApply(v -> null)
+                                         .thenApply(v -> CommitStatus.SUCCESS)
                            : transactions.put(transactionId, Transaction.State.ROLLINGBACK)
                                          .thenCompose(v -> doRollback(transactionParticipants))
-                                         .thenApply(v -> null))
-                    .thenCompose(v -> transactions.remove(transactionId))
-                    .thenApply(v -> null);
+                                         .thenApply(v -> CommitStatus.FAILURE));
+        return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
     }
 
     private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
-        return Tools.allOf(transactionParticipants
-                                           .stream()
-                                           .map(TransactionParticipant::prepare)
-                                           .collect(Collectors.toList()))
+        return Tools.allOf(transactionParticipants.stream()
+                                                  .filter(TransactionParticipant::hasPendingUpdates)
+                                                  .map(TransactionParticipant::prepare)
+                                                  .collect(Collectors.toList()))
                     .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
     }
 
     private CompletableFuture<Void> doCommit(Set<TransactionParticipant> transactionParticipants) {
         return CompletableFuture.allOf(transactionParticipants.stream()
-                                                              .map(p -> p.commit())
+                                                              .filter(TransactionParticipant::hasPendingUpdates)
+                                                              .map(TransactionParticipant::commit)
                                                               .toArray(CompletableFuture[]::new));
     }
 
     private CompletableFuture<Void> doRollback(Set<TransactionParticipant> transactionParticipants) {
         return CompletableFuture.allOf(transactionParticipants.stream()
-                                                              .map(p -> p.rollback())
+                                                              .filter(TransactionParticipant::hasPendingUpdates)
+                                                              .map(TransactionParticipant::rollback)
                                                               .toArray(CompletableFuture[]::new));
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 41f6e25..bb320e4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -87,13 +87,7 @@
     }
 
     private void handleEvent(List<MapEvent<String, byte[]>> events) {
-        events.forEach(event -> mapEventListeners.forEach(listener -> {
-            try {
-                listener.event(event);
-            } catch (Exception e) {
-                log.warn("Error processing map event", e);
-            }
-        }));
+        events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
     }
 
     @Override