Improvement: Ensure configurations options are current and valid in NewDistributedFlowRuleStore
Bug fix: Only accept backups for devices that the local node does not manage.

Change-Id: If7b1e8c3b0339e5d756e250c38fe53dc191084d1
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index cd85a75..e042982 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -81,6 +81,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -106,6 +107,7 @@
 
     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
     private static final boolean DEFAULT_BACKUP_ENABLED = true;
+    private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
 
     @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
@@ -116,6 +118,10 @@
             label = "Indicates whether backups are enabled or not")
     private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
 
+    @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
+            label = "Delay in ms between successive backup runs")
+    private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
+
     private InternalFlowTable flowTable = new InternalFlowTable();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -142,6 +148,7 @@
     private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
     private ExecutorService messageHandlingExecutor;
 
+    private ScheduledFuture<?> backupTask;
     private final ScheduledExecutorService backupSenderExecutor =
             Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
 
@@ -173,7 +180,13 @@
 
         registerMessageHandlers(messageHandlingExecutor);
 
-        backupSenderExecutor.scheduleWithFixedDelay(() -> flowTable.backup(), 0, 2000, TimeUnit.MILLISECONDS);
+        if (backupEnabled) {
+            backupTask = backupSenderExecutor.scheduleWithFixedDelay(
+                    flowTable::backup,
+                    0,
+                    backupPeriod,
+                    TimeUnit.MILLISECONDS);
+        }
 
         logConfig("Started");
     }
@@ -199,6 +212,7 @@
         Dictionary properties = context.getProperties();
         int newPoolSize;
         boolean newBackupEnabled;
+        int newBackupPeriod;
         try {
             String s = get(properties, "msgHandlerPoolSize");
             newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
@@ -206,13 +220,38 @@
             s = get(properties, "backupEnabled");
             newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
 
+            s = get(properties, "backupPeriod");
+            newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
+
         } catch (NumberFormatException | ClassCastException e) {
             newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
             newBackupEnabled = DEFAULT_BACKUP_ENABLED;
+            newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
         }
 
+        boolean restartBackupTask = false;
         if (newBackupEnabled != backupEnabled) {
             backupEnabled = newBackupEnabled;
+            if (!backupEnabled && backupTask != null) {
+                backupTask.cancel(false);
+                backupTask = null;
+            }
+            restartBackupTask = backupEnabled;
+        }
+        if (newBackupPeriod != backupPeriod) {
+            backupPeriod = newBackupPeriod;
+            restartBackupTask = backupEnabled;
+        }
+        if (restartBackupTask) {
+            if (backupTask != null) {
+                // cancel previously running task
+                backupTask.cancel(false);
+            }
+            backupTask = backupSenderExecutor.scheduleWithFixedDelay(
+                    flowTable::backup,
+                    0,
+                    backupPeriod,
+                    TimeUnit.MILLISECONDS);
         }
         if (newPoolSize != msgHandlerPoolSize) {
             msgHandlerPoolSize = newPoolSize;
@@ -254,8 +293,8 @@
     }
 
     private void logConfig(String prefix) {
-        log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
-                 prefix, msgHandlerPoolSize, backupEnabled);
+        log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
+                 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
     }
 
     // This is not a efficient operation on a distributed sharded
@@ -620,6 +659,9 @@
         }
 
         private void backup() {
+            if (!backupEnabled) {
+                return;
+            }
             //TODO: Force backup when backups change.
             try {
                 // determine the set of devices that we need to backup during this run.
@@ -674,11 +716,13 @@
 
         private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
             Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
-            Maps.filterKeys(flowTables, managedDevices::contains).forEach((deviceId, flowTable) -> {
-                Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
-                deviceFlowTable.clear();
-                deviceFlowTable.putAll(flowTable);
-            });
+            // Only process those devices are that not managed by the local node.
+            Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
+                .forEach((deviceId, flowTable) -> {
+                    Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
+                    deviceFlowTable.clear();
+                    deviceFlowTable.putAll(flowTable);
+                });
         }
     }
 }