[ONOS-7729] Copy flow buckets prior to replication to avoid inconsistent collection serialization

Change-Id: Ibad42af76fcc551c3326bbb6ed7627a04791b35a
(cherry picked from commit 4c3a0458e665f67e630aaa1f2d370bd3dab26cd4)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
index 2205b0b..34d470c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -18,6 +18,7 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Maps;
 import org.onosproject.net.flow.DefaultFlowEntry;
@@ -39,11 +40,22 @@
     private static final Logger LOGGER = LoggerFactory.getLogger(FlowBucket.class);
     private final BucketId bucketId;
     private volatile long term;
-    private volatile LogicalTimestamp timestamp = new LogicalTimestamp(0);
-    private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket = Maps.newConcurrentMap();
+    private volatile LogicalTimestamp timestamp;
+    private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket;
 
     FlowBucket(BucketId bucketId) {
+        this(bucketId, 0, new LogicalTimestamp(0), Maps.newConcurrentMap());
+    }
+
+    private FlowBucket(
+        BucketId bucketId,
+        long term,
+        LogicalTimestamp timestamp,
+        Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket) {
         this.bucketId = bucketId;
+        this.term = term;
+        this.timestamp = timestamp;
+        this.flowBucket = flowBucket;
     }
 
     /**
@@ -115,6 +127,22 @@
     }
 
     /**
+     * Returns a new copy of the flow bucket.
+     *
+     * @return a new copy of the flow bucket
+     */
+    FlowBucket copy() {
+        return new FlowBucket(
+            bucketId,
+            term,
+            timestamp,
+            flowBucket.entrySet()
+                .stream()
+                .map(e -> Maps.immutableEntry(e.getKey(), Maps.newHashMap(e.getValue())))
+                .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
+    }
+
+    /**
      * Records an update to the bucket.
      */
     private void recordUpdate(long term, LogicalTimestamp timestamp) {