ONOS-4218: Fixes for resource store transaction failures

Change-Id: Ie48bb04d7daf6ed7b63c33a3c3c2703496179aa6
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
index c8151e3..8f7cd1f 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
@@ -21,6 +21,8 @@
 
 import java.util.function.Function;
 
+import org.onlab.util.ByteArraySizeHashPrinter;
+
 import com.google.common.base.MoreObjects;
 
 /**
@@ -153,7 +155,7 @@
             .add("mapName", mapName)
             .add("type", type)
             .add("key", key)
-            .add("value", value)
+            .add("value", value instanceof byte[] ? new ByteArraySizeHashPrinter((byte[]) value) : value)
             .add("currentValue", currentValue)
             .add("currentVersion", currentVersion)
             .toString();
diff --git a/core/api/src/main/java/org/onosproject/store/service/CommitStatus.java b/core/api/src/main/java/org/onosproject/store/service/CommitStatus.java
new file mode 100644
index 0000000..8e86184
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/CommitStatus.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * Completion status of transaction.
+ */
+public enum CommitStatus {
+    /**
+     * Indicates a successfully completed transaction with all the updates committed.
+     */
+    SUCCESS,
+
+    /**
+     * Indicates a aborted transaction i.e. no updates were committed.
+     */
+    FAILURE
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
index 3027fcf..8f37caf 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
@@ -16,6 +16,8 @@
 
 package org.onosproject.store.service;
 
+import java.util.concurrent.CompletableFuture;
+
 import org.onosproject.store.primitives.TransactionId;
 
 /**
@@ -63,9 +65,9 @@
      * Commits a transaction that was previously started thereby making its changes permanent
      * and externally visible.
      *
-     * @return true if this transaction succeeded, otherwise false.
+     * @return A future that will be completed when the operation completes
      */
-    boolean commit();
+    CompletableFuture<CommitStatus> commit();
 
     /**
      * Aborts any changes made in this transaction context and discarding all locally cached updates.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
index aed07cd..e7c27bc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
@@ -20,6 +20,7 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Reference;
@@ -41,6 +42,7 @@
 import org.onosproject.net.resource.Resources;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.CommitStatus;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.Serializer;
@@ -178,18 +180,18 @@
             }
         }
 
-        boolean success = tx.commit();
-        if (success) {
-            log.trace("Transaction commit succeeded on registration: resources={}", resources);
-            List<ResourceEvent> events = resources.stream()
-                    .filter(x -> x.parent().isPresent())
-                    .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
-                    .collect(Collectors.toList());
-            notifyDelegate(events);
-        } else {
-            log.debug("Transaction commit failed on registration: resources={}", resources);
-        }
-        return success;
+        return tx.commit().whenComplete((status, error) -> {
+            if (status == CommitStatus.SUCCESS) {
+                log.trace("Transaction commit succeeded on registration: resources={}", resources);
+                List<ResourceEvent> events = resources.stream()
+                        .filter(x -> x.parent().isPresent())
+                        .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
+                        .collect(Collectors.toList());
+                notifyDelegate(events);
+            } else {
+                log.warn("Transaction commit failed on registration", error);
+            }
+        }).join() == CommitStatus.SUCCESS;
     }
 
     @Override
@@ -252,17 +254,17 @@
             }
         }
 
-        boolean success = tx.commit();
-        if (success) {
-            List<ResourceEvent> events = resources.stream()
-                    .filter(x -> x.parent().isPresent())
-                    .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
-                    .collect(Collectors.toList());
-            notifyDelegate(events);
-        } else {
-            log.warn("Failed to unregister {}: Commit failed.", ids);
-        }
-        return success;
+        return tx.commit().whenComplete((status, error) -> {
+            if (status == CommitStatus.SUCCESS) {
+                List<ResourceEvent> events = resources.stream()
+                        .filter(x -> x.parent().isPresent())
+                        .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
+                        .collect(Collectors.toList());
+                notifyDelegate(events);
+            } else {
+                log.warn("Failed to unregister {}: Commit failed.", ids, error);
+            }
+        }).join() == CommitStatus.SUCCESS;
     }
 
     @Override
@@ -308,7 +310,7 @@
             }
         }
 
-        return tx.commit();
+        return tx.commit().join() == CommitStatus.SUCCESS;
     }
 
     @Override
@@ -348,7 +350,7 @@
             }
         }
 
-        return tx.commit();
+        return tx.commit().join() == CommitStatus.SUCCESS;
     }
 
     // computational complexity: O(1) if the resource is discrete type.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index 43cacb3..30a6f26 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -27,6 +27,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.CommitResult;
+import org.onosproject.store.service.CommitStatus;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContext;
@@ -96,22 +97,23 @@
 
     @SuppressWarnings("unchecked")
     @Override
-    public boolean commit() {
+    public CompletableFuture<CommitStatus> commit() {
         // TODO: rework commit implementation to be more intuitive
         checkState(isOpen, TX_NOT_OPEN_ERROR);
-        CommitResult result = null;
+        CommitStatus status;
         try {
             List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
             txMaps.values().forEach(m -> updates.addAll(m.toMapUpdates()));
             Transaction transaction = new Transaction(transactionId, updates);
-            result = Futures.getUnchecked(transactionCommitter.apply(transaction));
-            return result == CommitResult.OK;
+            status = Futures.getUnchecked(transactionCommitter.apply(transaction)) == CommitResult.OK
+                    ? CommitStatus.SUCCESS : CommitStatus.FAILURE;
         } catch (Exception e) {
             abort();
-            return false;
+            status = CommitStatus.FAILURE;
         } finally {
             isOpen = false;
         }
+        return CompletableFuture.completedFuture(status);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
index 39d24cf..e9e75a5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
@@ -16,13 +16,16 @@
 package org.onosproject.store.primitives.impl;
 
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.CommitStatus;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContext;
 import org.onosproject.store.service.TransactionalMap;
+import org.onosproject.utils.MeteringAgent;
 
 import com.google.common.collect.Sets;
 
@@ -36,6 +39,7 @@
     private final TransactionId transactionId;
     private final TransactionCoordinator transactionCoordinator;
     private final Set<TransactionParticipant> txParticipants = Sets.newConcurrentHashSet();
+    private final MeteringAgent monitor;
 
     public NewDefaultTransactionContext(TransactionId transactionId,
             DistributedPrimitiveCreator creator,
@@ -43,6 +47,7 @@
         this.transactionId = transactionId;
         this.creator = creator;
         this.transactionCoordinator = transactionCoordinator;
+        this.monitor = new MeteringAgent("transactionContext", "*", true);
     }
 
     @Override
@@ -68,9 +73,10 @@
     }
 
     @Override
-    public boolean commit() {
-        transactionCoordinator.commit(transactionId, txParticipants).getNow(null);
-        return true;
+    public CompletableFuture<CommitStatus> commit() {
+        final MeteringAgent.Context timer = monitor.startTimer("commit");
+        return transactionCoordinator.commit(transactionId, txParticipants)
+                .whenComplete((r, e) -> timer.stop(e));
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
index 908a35d..1904894 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
@@ -22,6 +22,7 @@
 import org.onlab.util.Tools;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.CommitStatus;
 
 /**
  * Coordinator for a two-phase commit protocol.
@@ -37,45 +38,47 @@
     /**
      * Commits a transaction.
      *
-     * @param transactionId           transaction
+     * @param transactionId transaction identifier
      * @param transactionParticipants set of transaction participants
      * @return future for commit result
      */
-    CompletableFuture<Void> commit(TransactionId transactionId, Set<TransactionParticipant> transactionParticipants) {
+    CompletableFuture<CommitStatus> commit(TransactionId transactionId,
+                                           Set<TransactionParticipant> transactionParticipants) {
         if (!transactionParticipants.stream().anyMatch(t -> t.hasPendingUpdates())) {
-            return CompletableFuture.completedFuture(null);
+            return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
         }
 
-       return  transactions.put(transactionId, Transaction.State.PREPARING)
+        CompletableFuture<CommitStatus> status =  transactions.put(transactionId, Transaction.State.PREPARING)
                     .thenCompose(v -> this.doPrepare(transactionParticipants))
                     .thenCompose(result -> result
                            ? transactions.put(transactionId, Transaction.State.COMMITTING)
                                          .thenCompose(v -> doCommit(transactionParticipants))
-                                         .thenApply(v -> null)
+                                         .thenApply(v -> CommitStatus.SUCCESS)
                            : transactions.put(transactionId, Transaction.State.ROLLINGBACK)
                                          .thenCompose(v -> doRollback(transactionParticipants))
-                                         .thenApply(v -> null))
-                    .thenCompose(v -> transactions.remove(transactionId))
-                    .thenApply(v -> null);
+                                         .thenApply(v -> CommitStatus.FAILURE));
+        return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
     }
 
     private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
-        return Tools.allOf(transactionParticipants
-                                           .stream()
-                                           .map(TransactionParticipant::prepare)
-                                           .collect(Collectors.toList()))
+        return Tools.allOf(transactionParticipants.stream()
+                                                  .filter(TransactionParticipant::hasPendingUpdates)
+                                                  .map(TransactionParticipant::prepare)
+                                                  .collect(Collectors.toList()))
                     .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
     }
 
     private CompletableFuture<Void> doCommit(Set<TransactionParticipant> transactionParticipants) {
         return CompletableFuture.allOf(transactionParticipants.stream()
-                                                              .map(p -> p.commit())
+                                                              .filter(TransactionParticipant::hasPendingUpdates)
+                                                              .map(TransactionParticipant::commit)
                                                               .toArray(CompletableFuture[]::new));
     }
 
     private CompletableFuture<Void> doRollback(Set<TransactionParticipant> transactionParticipants) {
         return CompletableFuture.allOf(transactionParticipants.stream()
-                                                              .map(p -> p.rollback())
+                                                              .filter(TransactionParticipant::hasPendingUpdates)
+                                                              .map(TransactionParticipant::rollback)
                                                               .toArray(CompletableFuture[]::new));
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 41f6e25..bb320e4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -87,13 +87,7 @@
     }
 
     private void handleEvent(List<MapEvent<String, byte[]>> events) {
-        events.forEach(event -> mapEventListeners.forEach(listener -> {
-            try {
-                listener.event(event);
-            } catch (Exception e) {
-                log.warn("Error processing map event", e);
-            }
-        }));
+        events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
     }
 
     @Override