[ONOS-7729] Copy flow buckets prior to replication to avoid inconsistent collection serialization
Change-Id: Ibad42af76fcc551c3326bbb6ed7627a04791b35a
(cherry picked from commit 4c3a0458e665f67e630aaa1f2d370bd3dab26cd4)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index e60a552..ec2532b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -373,7 +373,7 @@
// If the backup can be run (no concurrent backup to the node in progress) then run it.
BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
if (startBackup(operation, timestamp)) {
- backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
+ backup(bucket.copy(), nodeId).whenCompleteAsync((succeeded, error) -> {
if (error != null) {
log.debug("Backup operation {} failed", operation, error);
failBackup(operation);
@@ -639,11 +639,11 @@
/**
* Handles a flow bucket request.
*
- * @param bucket the bucket number
+ * @param bucketId the bucket number
* @return the flow bucket
*/
- private FlowBucket onGetBucket(int bucket) {
- return flowBuckets.get(bucket);
+ private FlowBucket onGetBucket(int bucketId) {
+ return flowBuckets.get(bucketId).copy();
}
/**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
index 2205b0b..34d470c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -18,6 +18,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.stream.Collectors;
import com.google.common.collect.Maps;
import org.onosproject.net.flow.DefaultFlowEntry;
@@ -39,11 +40,22 @@
private static final Logger LOGGER = LoggerFactory.getLogger(FlowBucket.class);
private final BucketId bucketId;
private volatile long term;
- private volatile LogicalTimestamp timestamp = new LogicalTimestamp(0);
- private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket = Maps.newConcurrentMap();
+ private volatile LogicalTimestamp timestamp;
+ private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket;
FlowBucket(BucketId bucketId) {
+ this(bucketId, 0, new LogicalTimestamp(0), Maps.newConcurrentMap());
+ }
+
+ private FlowBucket(
+ BucketId bucketId,
+ long term,
+ LogicalTimestamp timestamp,
+ Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket) {
this.bucketId = bucketId;
+ this.term = term;
+ this.timestamp = timestamp;
+ this.flowBucket = flowBucket;
}
/**
@@ -115,6 +127,22 @@
}
/**
+ * Returns a new copy of the flow bucket.
+ *
+ * @return a new copy of the flow bucket
+ */
+ FlowBucket copy() {
+ return new FlowBucket(
+ bucketId,
+ term,
+ timestamp,
+ flowBucket.entrySet()
+ .stream()
+ .map(e -> Maps.immutableEntry(e.getKey(), Maps.newHashMap(e.getValue())))
+ .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
+ }
+
+ /**
* Records an update to the bucket.
*/
private void recordUpdate(long term, LogicalTimestamp timestamp) {