Improvement: Ensure configurations options are current and valid in NewDistributedFlowRuleStore
Bug fix: Only accept backups for devices that the local node does not manage.
Change-Id: If7b1e8c3b0339e5d756e250c38fe53dc191084d1
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index cd85a75..e042982 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -81,6 +81,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -106,6 +107,7 @@
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
private static final boolean DEFAULT_BACKUP_ENABLED = true;
+ private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
@@ -116,6 +118,10 @@
label = "Indicates whether backups are enabled or not")
private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
+ @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
+ label = "Delay in ms between successive backup runs")
+ private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
+
private InternalFlowTable flowTable = new InternalFlowTable();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -142,6 +148,7 @@
private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
private ExecutorService messageHandlingExecutor;
+ private ScheduledFuture<?> backupTask;
private final ScheduledExecutorService backupSenderExecutor =
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
@@ -173,7 +180,13 @@
registerMessageHandlers(messageHandlingExecutor);
- backupSenderExecutor.scheduleWithFixedDelay(() -> flowTable.backup(), 0, 2000, TimeUnit.MILLISECONDS);
+ if (backupEnabled) {
+ backupTask = backupSenderExecutor.scheduleWithFixedDelay(
+ flowTable::backup,
+ 0,
+ backupPeriod,
+ TimeUnit.MILLISECONDS);
+ }
logConfig("Started");
}
@@ -199,6 +212,7 @@
Dictionary properties = context.getProperties();
int newPoolSize;
boolean newBackupEnabled;
+ int newBackupPeriod;
try {
String s = get(properties, "msgHandlerPoolSize");
newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
@@ -206,13 +220,38 @@
s = get(properties, "backupEnabled");
newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
+ s = get(properties, "backupPeriod");
+ newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
+
} catch (NumberFormatException | ClassCastException e) {
newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
newBackupEnabled = DEFAULT_BACKUP_ENABLED;
+ newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
}
+ boolean restartBackupTask = false;
if (newBackupEnabled != backupEnabled) {
backupEnabled = newBackupEnabled;
+ if (!backupEnabled && backupTask != null) {
+ backupTask.cancel(false);
+ backupTask = null;
+ }
+ restartBackupTask = backupEnabled;
+ }
+ if (newBackupPeriod != backupPeriod) {
+ backupPeriod = newBackupPeriod;
+ restartBackupTask = backupEnabled;
+ }
+ if (restartBackupTask) {
+ if (backupTask != null) {
+ // cancel previously running task
+ backupTask.cancel(false);
+ }
+ backupTask = backupSenderExecutor.scheduleWithFixedDelay(
+ flowTable::backup,
+ 0,
+ backupPeriod,
+ TimeUnit.MILLISECONDS);
}
if (newPoolSize != msgHandlerPoolSize) {
msgHandlerPoolSize = newPoolSize;
@@ -254,8 +293,8 @@
}
private void logConfig(String prefix) {
- log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
- prefix, msgHandlerPoolSize, backupEnabled);
+ log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
+ prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
}
// This is not a efficient operation on a distributed sharded
@@ -620,6 +659,9 @@
}
private void backup() {
+ if (!backupEnabled) {
+ return;
+ }
//TODO: Force backup when backups change.
try {
// determine the set of devices that we need to backup during this run.
@@ -674,11 +716,13 @@
private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
- Maps.filterKeys(flowTables, managedDevices::contains).forEach((deviceId, flowTable) -> {
- Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
- deviceFlowTable.clear();
- deviceFlowTable.putAll(flowTable);
- });
+ // Only process those devices are that not managed by the local node.
+ Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
+ .forEach((deviceId, flowTable) -> {
+ Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
+ deviceFlowTable.clear();
+ deviceFlowTable.putAll(flowTable);
+ });
}
}
}