Implement anti-entropy protocol for detecting and resolving missing flows in the ECFlowRuleStore
Change-Id: I90e1243a40b90328b9e4fdbeb36f830c20f18b5c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index b89bd58..9dc2f60 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -30,8 +30,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
@@ -97,6 +99,7 @@
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_ANTI_ENTROPY;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
@@ -119,6 +122,7 @@
private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
+ private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
private static final int NUM_BUCKETS = 1024;
@@ -129,6 +133,11 @@
@Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
label = "Delay in ms between successive backup runs")
private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
+
+ @Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
+ label = "Delay in ms between anti-entropy runs")
+ private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
+
@Property(name = "persistenceEnabled", boolValue = false,
label = "Indicates whether or not changes in the flow table should be persisted to disk.")
private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
@@ -168,6 +177,7 @@
private ExecutorService eventHandler;
private ScheduledFuture<?> backupTask;
+ private ScheduledFuture<?> antiEntropyTask;
private final ScheduledExecutorService backupSenderExecutor =
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
@@ -215,6 +225,11 @@
0,
backupPeriod,
TimeUnit.MILLISECONDS);
+ antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
+ flowTable::runAntiEntropy,
+ 0,
+ antiEntropyPeriod,
+ TimeUnit.MILLISECONDS);
flowCounts = storageService.<BucketId, Integer>eventuallyConsistentMapBuilder()
.withName("onos-flow-counts")
@@ -262,6 +277,7 @@
int newPoolSize;
int newBackupPeriod;
int newBackupCount;
+ int newAntiEntropyPeriod;
try {
String s = get(properties, "msgHandlerPoolSize");
newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
@@ -271,18 +287,29 @@
s = get(properties, "backupCount");
newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
+
+ s = get(properties, "antiEntropyPeriod");
+ newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
} catch (NumberFormatException | ClassCastException e) {
newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
+ newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
}
boolean restartBackupTask = false;
+ boolean restartAntiEntropyTask = false;
if (newBackupPeriod != backupPeriod) {
backupPeriod = newBackupPeriod;
restartBackupTask = true;
}
+
+ if (newAntiEntropyPeriod != antiEntropyPeriod) {
+ antiEntropyPeriod = newAntiEntropyPeriod;
+ restartAntiEntropyTask = true;
+ }
+
if (restartBackupTask) {
if (backupTask != null) {
// cancel previously running task
@@ -294,6 +321,19 @@
backupPeriod,
TimeUnit.MILLISECONDS);
}
+
+ if (restartAntiEntropyTask) {
+ if (antiEntropyTask != null) {
+ // cancel previously running task
+ antiEntropyTask.cancel(false);
+ }
+ antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
+ flowTable::runAntiEntropy,
+ 0,
+ antiEntropyPeriod,
+ TimeUnit.MILLISECONDS);
+ }
+
if (newPoolSize != msgHandlerPoolSize) {
msgHandlerPoolSize = newPoolSize;
ExecutorService oldMsgHandler = messageHandlingExecutor;
@@ -304,6 +344,7 @@
registerMessageHandlers(messageHandlingExecutor);
oldMsgHandler.shutdown();
}
+
if (backupCount != newBackupCount) {
backupCount = newBackupCount;
}
@@ -323,6 +364,8 @@
REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
clusterCommunicator.addSubscriber(
FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackup, serializer::encode, executor);
+ clusterCommunicator.addSubscriber(
+ FLOW_TABLE_ANTI_ENTROPY, serializer::decode, flowTable::onAntiEntropy, serializer::encode, executor);
}
private void unregisterMessageHandlers() {
@@ -332,6 +375,7 @@
clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
+ clusterCommunicator.removeSubscriber(FLOW_TABLE_ANTI_ENTROPY);
}
private void logConfig(String prefix) {
@@ -733,6 +777,7 @@
private class FlowBucket {
private final BucketId bucketId;
private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
+ private final long timestamp;
BucketId bucketId() {
return bucketId;
@@ -742,9 +787,78 @@
return table;
}
- FlowBucket(BucketId bucketId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table) {
+ long timestamp() {
+ return timestamp;
+ }
+
+ FlowBucket(BucketId bucketId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table, long timestamp) {
this.bucketId = bucketId;
this.table = table;
+ this.timestamp = timestamp;
+ }
+ }
+
+ /**
+ * Device digest.
+ */
+ private class DeviceDigest {
+ private final DeviceId deviceId;
+ private final Set<FlowBucketDigest> digests;
+
+ DeviceDigest(DeviceId deviceId, Set<FlowBucketDigest> digests) {
+ this.deviceId = deviceId;
+ this.digests = digests;
+ }
+
+ DeviceId deviceId() {
+ return deviceId;
+ }
+
+ Set<FlowBucketDigest> digests() {
+ return digests;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceId, digests);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ return object instanceof DeviceDigest
+ && ((DeviceDigest) object).deviceId.equals(deviceId);
+ }
+ }
+
+ /**
+ * Flow bucket digest.
+ */
+ private class FlowBucketDigest {
+ private final BucketId bucketId;
+ private final long timestamp;
+
+ FlowBucketDigest(BucketId bucketId, long timestamp) {
+ this.bucketId = bucketId;
+ this.timestamp = timestamp;
+ }
+
+ BucketId bucketId() {
+ return bucketId;
+ }
+
+ long timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bucketId);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ return object instanceof FlowBucketDigest
+ && ((FlowBucketDigest) object).bucketId.equals(bucketId);
}
}
@@ -777,6 +891,30 @@
}
/**
+ * Returns the set of devices in the flow table.
+ *
+ * @return the set of devices in the flow table
+ */
+ private Set<DeviceId> getDevices() {
+ return flowEntries.keySet();
+ }
+
+ /**
+ * Returns the digests for all buckets in the flow table for the given device.
+ *
+ * @param deviceId the device for which to return digests
+ * @return the set of digests for all buckets for the given device
+ */
+ private Set<FlowBucketDigest> getDigests(DeviceId deviceId) {
+ return IntStream.range(0, NUM_BUCKETS)
+ .mapToObj(bucket -> {
+ BucketId bucketId = new BucketId(deviceId, bucket);
+ long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
+ return new FlowBucketDigest(bucketId, timestamp);
+ }).collect(Collectors.toSet());
+ }
+
+ /**
* Returns the flow table for specified device.
*
* @param deviceId identifier of the device
@@ -801,11 +939,13 @@
}
private FlowBucket getFlowBucket(BucketId bucketId) {
+ long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
return new FlowBucket(bucketId, getFlowTable(bucketId.deviceId())
.entrySet()
.stream()
.filter(entry -> isInBucket(entry.getKey(), bucketId.bucket()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
+ timestamp);
}
private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
@@ -927,7 +1067,7 @@
* Backs up all devices to all backup nodes.
*/
private void backup() {
- for (DeviceId deviceId : flowEntries.keySet()) {
+ for (DeviceId deviceId : getDevices()) {
backup(deviceId);
}
}
@@ -1036,8 +1176,8 @@
FlowBucket flowBucket = getFlowBucket(operation.bucketId());
CompletableFuture<Boolean> future = new CompletableFuture<>();
- clusterCommunicator.<FlowBucket, Set<FlowId>>
- sendAndReceive(flowBucket,
+ clusterCommunicator.<FlowBucket, Set<FlowId>>sendAndReceive(
+ flowBucket,
FLOW_TABLE_BACKUP,
serializer::encode,
serializer::decode,
@@ -1080,6 +1220,7 @@
.removeIf(entry -> isInBucket(entry.getKey(), flowBucket.bucketId().bucket())
&& !flowBucket.table().containsKey(entry.getKey()));
backedupFlows.addAll(flowBucket.table().keySet());
+ lastUpdateTimes.put(flowBucket.bucketId(), flowBucket.timestamp());
}
} catch (Exception e) {
log.warn("Failure processing backup request", e);
@@ -1088,6 +1229,97 @@
}
/**
+ * Runs the anti-entropy protocol.
+ */
+ private void runAntiEntropy() {
+ for (DeviceId deviceId : getDevices()) {
+ runAntiEntropy(deviceId);
+ }
+ }
+
+ /**
+ * Runs the anti-entropy protocol for the given device.
+ *
+ * @param deviceId the device for which to run the anti-entropy protocol
+ */
+ private void runAntiEntropy(DeviceId deviceId) {
+ if (!isMasterNode(deviceId)) {
+ return;
+ }
+
+ // Get the set of digests for the node.
+ Set<FlowBucketDigest> digests = getDigests(deviceId);
+
+ // Get a list of backup nodes for the device and compute the real backup count.
+ List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
+ int availableBackupCount = Math.min(backupCount, backupNodes.size());
+
+ // Iterate through backup nodes and run the anti-entropy protocol.
+ for (int index = 0; index < availableBackupCount; index++) {
+ NodeId backupNode = backupNodes.get(index);
+ try {
+ runAntiEntropy(deviceId, backupNode, digests);
+ } catch (Exception e) {
+ log.error("Anti-entropy for " + deviceId + " to " + backupNode + " failed", e);
+ }
+ }
+ }
+
+ /**
+ * Sends an anti-entropy advertisement to the given node.
+ *
+ * @param deviceId the device ID for which to send the advertisement
+ * @param nodeId the node to which to send the advertisement
+ * @param digests the digests to send to the given node
+ */
+ private void runAntiEntropy(DeviceId deviceId, NodeId nodeId, Set<FlowBucketDigest> digests) {
+ log.trace("Sending anti-entropy advertisement for device {} to {}", deviceId, nodeId);
+ clusterCommunicator.<Set<FlowBucketDigest>, Set<BucketId>>sendAndReceive(
+ digests,
+ FLOW_TABLE_ANTI_ENTROPY,
+ serializer::encode,
+ serializer::decode,
+ nodeId)
+ .whenComplete((missingBuckets, error) -> {
+ if (error == null) {
+ log.debug("Detected {} missing buckets on node {} for device {}",
+ missingBuckets.size(), nodeId, deviceId);
+ } else {
+ log.trace("Anti-entropy advertisement for device {} to {} failed", deviceId, nodeId, error);
+ }
+ });
+ }
+
+ /**
+ * Handles a device anti-entropy request from a remote peer.
+ *
+ * @param digest the device digest
+ * @return the set of flow buckets to update
+ */
+ private Set<BucketId> onAntiEntropy(DeviceDigest digest) {
+ // If the local node is the master, reject the anti-entropy request.
+ // TODO: We really should be using mastership terms in anti-entropy requests to determine whether
+ // this node is a newer master, but that would only reduce the time it takes to resolve missing flows
+ // as a later anti-entropy request will still succeed once this node recognizes it's no longer the master.
+ NodeId master = replicaInfoManager.getReplicaInfoFor(digest.deviceId())
+ .master()
+ .orElse(null);
+ if (Objects.equals(master, local)) {
+ return ImmutableSet.of();
+ }
+
+ // Compute a set of missing BucketIds based on digest times and send them back to the master.
+ Set<BucketId> missingBuckets = new HashSet<>();
+ for (FlowBucketDigest flowBucketDigest : digest.digests()) {
+ long lastUpdated = lastUpdateTimes.getOrDefault(flowBucketDigest.bucketId(), 0L);
+ if (lastUpdated < flowBucketDigest.timestamp()) {
+ missingBuckets.add(flowBucketDigest.bucketId());
+ }
+ }
+ return missingBuckets;
+ }
+
+ /**
* Updates all flow counts for the given device.
*
* @param deviceId the device for which to update flow counts
@@ -1115,8 +1347,7 @@
}
@Override
- public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
- List<TableStatisticsEntry> tableStats) {
+ public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
deviceTableStats.put(deviceId, tableStats);
return null;
}
@@ -1152,4 +1383,4 @@
//TODO: Generate an event to listeners (do we need?)
}
}
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index fd4f6d7..254f5df 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -40,4 +40,7 @@
public static final MessageSubject FLOW_TABLE_BACKUP
= new MessageSubject("peer-flow-table-backup");
+
+ public static final MessageSubject FLOW_TABLE_ANTI_ENTROPY
+ = new MessageSubject("peer-flow-table-anti-entropy");
}