FlowRuleStore: Configurable backup count
Change-Id: Ida4d3669e28e66350f4809539a48a456b6ec43c7
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index b3a2e36..ed68887 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -16,90 +16,92 @@
package org.onosproject.store.flow.impl;
import com.google.common.collect.ImmutableList;
- import com.google.common.collect.ImmutableMap;
- import com.google.common.collect.Iterables;
- import com.google.common.collect.Maps;
- import com.google.common.collect.Sets;
- import com.google.common.util.concurrent.Futures;
- import org.apache.felix.scr.annotations.Activate;
- import org.apache.felix.scr.annotations.Component;
- import org.apache.felix.scr.annotations.Deactivate;
- import org.apache.felix.scr.annotations.Modified;
- import org.apache.felix.scr.annotations.Property;
- import org.apache.felix.scr.annotations.Reference;
- import org.apache.felix.scr.annotations.ReferenceCardinality;
- import org.apache.felix.scr.annotations.Service;
- import org.onlab.util.KryoNamespace;
- import org.onlab.util.Tools;
- import org.onosproject.cfg.ComponentConfigService;
- import org.onosproject.cluster.ClusterService;
- import org.onosproject.cluster.NodeId;
- import org.onosproject.core.CoreService;
- import org.onosproject.core.IdGenerator;
- import org.onosproject.mastership.MastershipService;
- import org.onosproject.net.DeviceId;
- import org.onosproject.net.device.DeviceService;
- import org.onosproject.net.flow.CompletedBatchOperation;
- import org.onosproject.net.flow.DefaultFlowEntry;
- import org.onosproject.net.flow.FlowEntry;
- import org.onosproject.net.flow.FlowEntry.FlowEntryState;
- import org.onosproject.net.flow.FlowId;
- import org.onosproject.net.flow.FlowRule;
- import org.onosproject.net.flow.FlowRuleBatchEntry;
- import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
- import org.onosproject.net.flow.FlowRuleBatchEvent;
- import org.onosproject.net.flow.FlowRuleBatchOperation;
- import org.onosproject.net.flow.FlowRuleBatchRequest;
- import org.onosproject.net.flow.FlowRuleEvent;
- import org.onosproject.net.flow.FlowRuleEvent.Type;
- import org.onosproject.net.flow.FlowRuleService;
- import org.onosproject.net.flow.FlowRuleStore;
- import org.onosproject.net.flow.FlowRuleStoreDelegate;
- import org.onosproject.net.flow.StoredFlowEntry;
- import org.onosproject.net.flow.TableStatisticsEntry;
- import org.onosproject.persistence.PersistenceService;
- import org.onosproject.store.AbstractStore;
- import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
- import org.onosproject.store.cluster.messaging.ClusterMessage;
- import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
- import org.onosproject.store.flow.ReplicaInfoEvent;
- import org.onosproject.store.flow.ReplicaInfoEventListener;
- import org.onosproject.store.flow.ReplicaInfoService;
- import org.onosproject.store.impl.MastershipBasedTimestamp;
- import org.onosproject.store.serializers.KryoNamespaces;
- import org.onosproject.store.serializers.StoreSerializer;
- import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
- import org.onosproject.store.service.EventuallyConsistentMap;
- import org.onosproject.store.service.EventuallyConsistentMapEvent;
- import org.onosproject.store.service.EventuallyConsistentMapListener;
- import org.onosproject.store.service.Serializer;
- import org.onosproject.store.service.StorageService;
- import org.onosproject.store.service.WallClockTimestamp;
- import org.osgi.service.component.ComponentContext;
- import org.slf4j.Logger;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.CompletedBatchOperation;
+import org.onosproject.net.flow.DefaultFlowEntry;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowEntry.FlowEntryState;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleBatchEvent;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleBatchRequest;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleEvent.Type;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.FlowRuleStore;
+import org.onosproject.net.flow.FlowRuleStoreDelegate;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.TableStatisticsEntry;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.flow.ReplicaInfoEvent;
+import org.onosproject.store.flow.ReplicaInfoEventListener;
+import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.impl.MastershipBasedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
import java.util.Collections;
- import java.util.Dictionary;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.Set;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ScheduledFuture;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.concurrent.atomic.AtomicReference;
- import java.util.stream.Collectors;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import static com.google.common.base.Strings.isNullOrEmpty;
- import static org.onlab.util.Tools.get;
- import static org.onlab.util.Tools.groupedThreads;
- import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
- import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
- import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
+import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
+import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of flow rules using a distributed state management protocol.
@@ -114,6 +116,7 @@
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
private static final boolean DEFAULT_BACKUP_ENABLED = true;
+ private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
@@ -126,7 +129,7 @@
@Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
label = "Indicates whether backups are enabled or not")
- private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
+ private volatile boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
@Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
label = "Delay in ms between successive backup runs")
@@ -135,6 +138,10 @@
label = "Indicates whether or not changes in the flow table should be persisted to disk.")
private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
+ @Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
+ label = "Max number of backup copies for each device")
+ private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
+
private InternalFlowTable flowTable = new InternalFlowTable();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -255,6 +262,7 @@
int newPoolSize;
boolean newBackupEnabled;
int newBackupPeriod;
+ int newBackupCount;
try {
String s = get(properties, "msgHandlerPoolSize");
newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
@@ -265,10 +273,13 @@
s = get(properties, "backupPeriod");
newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
+ s = get(properties, "backupCount");
+ newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
} catch (NumberFormatException | ClassCastException e) {
newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
newBackupEnabled = DEFAULT_BACKUP_ENABLED;
newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
+ newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
}
boolean restartBackupTask = false;
@@ -310,6 +321,9 @@
registerMessageHandlers(messageHandlingExecutor);
oldMsgHandler.shutdown();
}
+ if (backupCount != newBackupCount) {
+ backupCount = newBackupCount;
+ }
logConfig("Reconfigured");
}
@@ -340,8 +354,8 @@
}
private void logConfig(String prefix) {
- log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
- prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
+ log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}, backupCount = {}",
+ prefix, msgHandlerPoolSize, backupEnabled, backupPeriod, backupCount);
}
// This is not a efficient operation on a distributed sharded
@@ -652,15 +666,40 @@
}
}
+ private class BackupOperation {
+ private final NodeId nodeId;
+ private final DeviceId deviceId;
+
+ public BackupOperation(NodeId nodeId, DeviceId deviceId) {
+ this.nodeId = nodeId;
+ this.deviceId = deviceId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(nodeId, deviceId);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other != null && other instanceof BackupOperation) {
+ BackupOperation that = (BackupOperation) other;
+ return this.nodeId.equals(that.nodeId) &&
+ this.deviceId.equals(that.deviceId);
+ } else {
+ return false;
+ }
+ }
+ }
+
private class InternalFlowTable implements ReplicaInfoEventListener {
//TODO replace the Map<V,V> with ExtendedSet
private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
flowEntries = Maps.newConcurrentMap();
- private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
+ private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
- private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
@Override
public void event(ReplicaInfoEvent event) {
@@ -668,41 +707,14 @@
}
private void handleEvent(ReplicaInfoEvent event) {
- if (!backupEnabled) {
+ DeviceId deviceId = event.subject();
+ if (!backupEnabled || !mastershipService.isLocalMaster(deviceId)) {
return;
}
- if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
- DeviceId deviceId = event.subject();
- NodeId master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- // ignore since this event is for a device this node does not manage.
- return;
- }
- NodeId newBackupNode = getBackupNode(deviceId);
- NodeId currentBackupNode = lastBackupNodes.get(deviceId);
- if (Objects.equals(newBackupNode, currentBackupNode)) {
- // ignore since backup location hasn't changed.
- return;
- }
- if (currentBackupNode != null && newBackupNode == null) {
- // Current backup node is most likely down and no alternate backup node
- // has been chosen. Clear current backup location so that we can resume
- // backups when either current backup comes online or a different backup node
- // is chosen.
- log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
- + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
- lastBackupNodes.remove(deviceId);
- lastBackupTimes.remove(deviceId);
- return;
- // TODO: Pick any available node as backup and ensure hand-off occurs when
- // a new master is elected.
- }
- log.debug("Backup location for {} has changed from {} to {}.",
- deviceId, currentBackupNode, newBackupNode);
- backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
- 0,
- TimeUnit.SECONDS);
+ if (event.type() == MASTER_CHANGED) {
+ lastUpdateTimes.put(deviceId, System.currentTimeMillis());
}
+ backupSenderExecutor.schedule(this::backup, 0, TimeUnit.SECONDS);
}
private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
@@ -715,7 +727,7 @@
if (deviceIds.isEmpty()) {
return;
}
- log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
+ log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, nodeId);
Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
deviceFlowEntries = Maps.newConcurrentMap();
deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
@@ -737,8 +749,7 @@
}
if (backedupDevices != null) {
backedupDevices.forEach(id -> {
- lastBackupTimes.put(id, System.currentTimeMillis());
- lastBackupNodes.put(id, nodeId);
+ lastBackupTimes.put(new BackupOperation(nodeId, id), System.currentTimeMillis());
});
}
});
@@ -836,10 +847,11 @@
flowEntries.remove(deviceId);
}
- private NodeId getBackupNode(DeviceId deviceId) {
- List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
- // pick the standby which is most likely to become next master
- return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
+ private List<NodeId> getBackupNodes(DeviceId deviceId) {
+ // The returned backup node list is in the order of preference i.e. next likely master first.
+ List<NodeId> allPossibleBackupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
+ return ImmutableList.copyOf(allPossibleBackupNodes)
+ .subList(0, Math.min(allPossibleBackupNodes.size(), backupCount));
}
private void backup() {
@@ -847,29 +859,17 @@
return;
}
try {
- // determine the set of devices that we need to backup during this run.
- Set<DeviceId> devicesToBackup = flowEntries.keySet()
- .stream()
- .filter(mastershipService::isLocalMaster)
- .filter(deviceId -> {
- Long lastBackupTime = lastBackupTimes.get(deviceId);
- Long lastUpdateTime = lastUpdateTimes.get(deviceId);
- NodeId lastBackupNode = lastBackupNodes.get(deviceId);
- NodeId newBackupNode = getBackupNode(deviceId);
- return lastBackupTime == null
- || !Objects.equals(lastBackupNode, newBackupNode)
- || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
- })
- .collect(Collectors.toSet());
-
// compute a mapping from node to the set of devices whose flow entries it should backup
Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
- devicesToBackup.forEach(deviceId -> {
- NodeId backupLocation = getBackupNode(deviceId);
- if (backupLocation != null) {
- devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
- .add(deviceId);
- }
+ flowEntries.keySet().stream().forEach(deviceId -> {
+ List<NodeId> backupNodes = getBackupNodes(deviceId);
+ backupNodes.forEach(backupNode -> {
+ if (lastBackupTimes.getOrDefault(new BackupOperation(backupNode, deviceId), 0L)
+ < lastUpdateTimes.getOrDefault(deviceId, 0L)) {
+ devicesToBackupByNode.computeIfAbsent(backupNode,
+ nodeId -> Sets.newHashSet()).add(deviceId);
+ }
+ });
});
// send the device flow entries to their respective backup nodes
devicesToBackupByNode.forEach(this::sendBackups);