ONOS-2077: Limit the number of devices whose flow entries are backed in each communication round
Change-Id: I190a05bb1a123ad49edc6d2d192295c05587e410
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index f57ba3d..fd5ddea 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -106,6 +106,8 @@
private static final boolean DEFAULT_BACKUP_ENABLED = true;
private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+ // number of devices whose flow entries will be backed up in one communication round
+ private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
label = "Number of threads in the message handler pool")
@@ -638,7 +640,16 @@
}
}
+ private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
+ // split up the devices into smaller batches and send them separately.
+ Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
+ .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
+ }
+
private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
+ if (deviceIds.isEmpty()) {
+ return;
+ }
log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
Maps.newConcurrentMap();
@@ -750,7 +761,7 @@
}
});
// send the device flow entries to their respective backup nodes
- devicesToBackupByNode.forEach(this::backupFlowEntries);
+ devicesToBackupByNode.forEach(this::sendBackups);
} catch (Exception e) {
log.error("Backup failed.", e);
}