ONOS-2077: Limit the number of devices whose flow entries are backed in each communication round

Change-Id: I190a05bb1a123ad49edc6d2d192295c05587e410
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index f57ba3d..fd5ddea 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -106,6 +106,8 @@
     private static final boolean DEFAULT_BACKUP_ENABLED = true;
     private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+    // number of devices whose flow entries will be backed up in one communication round
+    private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
 
     @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
             label = "Number of threads in the message handler pool")
@@ -638,7 +640,16 @@
             }
         }
 
+        private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
+            // split up the devices into smaller batches and send them separately.
+            Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
+                     .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
+        }
+
         private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
+            if (deviceIds.isEmpty()) {
+                return;
+            }
             log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
             Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
                     Maps.newConcurrentMap();
@@ -750,7 +761,7 @@
                     }
                 });
                 // send the device flow entries to their respective backup nodes
-                devicesToBackupByNode.forEach(this::backupFlowEntries);
+                devicesToBackupByNode.forEach(this::sendBackups);
             } catch (Exception e) {
                 log.error("Backup failed.", e);
             }