[ONOS-7729] Copy flow buckets prior to replication to avoid inconsistent collection serialization

Change-Id: Ibad42af76fcc551c3326bbb6ed7627a04791b35a
(cherry picked from commit 4c3a0458e665f67e630aaa1f2d370bd3dab26cd4)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index e60a552..ec2532b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -373,7 +373,7 @@
             // If the backup can be run (no concurrent backup to the node in progress) then run it.
             BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
             if (startBackup(operation, timestamp)) {
-                backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
+                backup(bucket.copy(), nodeId).whenCompleteAsync((succeeded, error) -> {
                     if (error != null) {
                         log.debug("Backup operation {} failed", operation, error);
                         failBackup(operation);
@@ -639,11 +639,11 @@
     /**
      * Handles a flow bucket request.
      *
-     * @param bucket the bucket number
+     * @param bucketId the bucket number
      * @return the flow bucket
      */
-    private FlowBucket onGetBucket(int bucket) {
-        return flowBuckets.get(bucket);
+    private FlowBucket onGetBucket(int bucketId) {
+        return flowBuckets.get(bucketId).copy();
     }
 
     /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
index 2205b0b..34d470c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -18,6 +18,7 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Maps;
 import org.onosproject.net.flow.DefaultFlowEntry;
@@ -39,11 +40,22 @@
     private static final Logger LOGGER = LoggerFactory.getLogger(FlowBucket.class);
     private final BucketId bucketId;
     private volatile long term;
-    private volatile LogicalTimestamp timestamp = new LogicalTimestamp(0);
-    private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket = Maps.newConcurrentMap();
+    private volatile LogicalTimestamp timestamp;
+    private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket;
 
     FlowBucket(BucketId bucketId) {
+        this(bucketId, 0, new LogicalTimestamp(0), Maps.newConcurrentMap());
+    }
+
+    private FlowBucket(
+        BucketId bucketId,
+        long term,
+        LogicalTimestamp timestamp,
+        Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket) {
         this.bucketId = bucketId;
+        this.term = term;
+        this.timestamp = timestamp;
+        this.flowBucket = flowBucket;
     }
 
     /**
@@ -115,6 +127,22 @@
     }
 
     /**
+     * Returns a new copy of the flow bucket.
+     *
+     * @return a new copy of the flow bucket
+     */
+    FlowBucket copy() {
+        return new FlowBucket(
+            bucketId,
+            term,
+            timestamp,
+            flowBucket.entrySet()
+                .stream()
+                .map(e -> Maps.immutableEntry(e.getKey(), Maps.newHashMap(e.getValue())))
+                .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
+    }
+
+    /**
      * Records an update to the bucket.
      */
     private void recordUpdate(long term, LogicalTimestamp timestamp) {