Move flow count updates off write path in ECFlowRuleStore
Change-Id: I44c611625baec124a45524ddb39fbe74f4c3c907
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 8efb40a..204f98f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -31,7 +31,11 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
+import com.google.common.util.concurrent.Futures;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -56,11 +60,6 @@
import org.onosproject.net.flow.FlowEntry.FlowEntryState;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleEvent.Type;
import org.onosproject.net.flow.FlowRuleService;
@@ -68,6 +67,11 @@
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TableStatisticsEntry;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -87,11 +91,6 @@
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
@@ -323,7 +322,7 @@
clusterCommunicator.addSubscriber(
REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
clusterCommunicator.addSubscriber(
- FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackupReceipt, serializer::encode, executor);
+ FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackup, serializer::encode, executor);
}
private void unregisterMessageHandlers() {
@@ -851,12 +850,6 @@
private void recordUpdate(BucketId bucketId) {
lastUpdateTimes.put(bucketId, System.currentTimeMillis());
- FlowBucket flowBucket = getFlowBucket(bucketId);
- int flowCount = flowBucket.table().entrySet()
- .stream()
- .mapToInt(e -> e.getValue().values().size())
- .sum();
- flowCounts.put(bucketId, flowCount);
}
public void add(FlowEntry rule) {
@@ -925,37 +918,63 @@
flowEntries.clear();
}
+ /**
+ * Returns a boolean indicating whether the local node is the current master for the given device.
+ *
+ * @param deviceId the device for which to indicate whether the local node is the current master
+ * @return indicates whether the local node is the current master for the given device
+ */
private boolean isMasterNode(DeviceId deviceId) {
NodeId master = replicaInfoManager.getReplicaInfoFor(deviceId).master().orElse(null);
return Objects.equals(master, clusterService.getLocalNode().id());
}
- private boolean isBackupNode(NodeId nodeId, DeviceId deviceId) {
- List<NodeId> allPossibleBackupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
- return allPossibleBackupNodes.indexOf(nodeId) < backupCount;
- }
-
+ /**
+ * Backs up all devices to all backup nodes.
+ */
private void backup() {
- clusterService.getNodes().stream()
- .filter(node -> !node.id().equals(clusterService.getLocalNode().id()))
- .forEach(node -> {
- try {
- backup(node.id());
- } catch (Exception e) {
- log.error("Backup failed.", e);
- }
- });
+ for (DeviceId deviceId : flowEntries.keySet()) {
+ backup(deviceId);
+ }
}
- private void backup(NodeId nodeId) {
- for (DeviceId deviceId : flowEntries.keySet()) {
- if (isMasterNode(deviceId) && isBackupNode(nodeId, deviceId)) {
- backup(nodeId, deviceId);
+ /**
+ * Backs up all buckets in the given device to the given node.
+ *
+ * @param deviceId the device to back up
+ */
+ private void backup(DeviceId deviceId) {
+ if (!isMasterNode(deviceId)) {
+ return;
+ }
+
+ // Get a list of backup nodes for the device.
+ List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
+ int availableBackupCount = Math.min(backupCount, backupNodes.size());
+
+ // If the list of backup nodes is empty, update the flow count.
+ if (availableBackupCount == 0) {
+ updateDeviceFlowCounts(deviceId);
+ } else {
+ // Otherwise, iterate through backup nodes and backup the device.
+ for (int index = 0; index < availableBackupCount; index++) {
+ NodeId backupNode = backupNodes.get(index);
+ try {
+ backup(deviceId, backupNode);
+ } catch (Exception e) {
+ log.error("Backup of " + deviceId + " to " + backupNode + " failed", e);
+ }
}
}
}
- private void backup(NodeId nodeId, DeviceId deviceId) {
+ /**
+ * Backs up all buckets for the given device to the given node.
+ *
+ * @param deviceId the device to back up
+ * @param nodeId the node to which to back up the device
+ */
+ private void backup(DeviceId deviceId, NodeId nodeId) {
final long timestamp = System.currentTimeMillis();
for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
BucketId bucketId = new BucketId(deviceId, bucket);
@@ -967,27 +986,56 @@
} else {
failBackup(operation);
}
- backup(nodeId);
+ backup(deviceId, nodeId);
}, backupSenderExecutor);
}
}
}
+ /**
+ * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
+ * <p>
+ * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
+ * are pending replication for the backup operation.
+ *
+ * @param operation the operation to start
+ * @return indicates whether the given backup operation should be started
+ */
private boolean startBackup(BackupOperation operation) {
long lastBackupTime = lastBackupTimes.getOrDefault(operation, 0L);
long lastUpdateTime = lastUpdateTimes.getOrDefault(operation.bucketId(), 0L);
return lastUpdateTime > 0 && lastBackupTime <= lastUpdateTime && inFlightUpdates.add(operation);
}
+ /**
+ * Fails the given backup operation.
+ *
+ * @param operation the backup operation to fail
+ */
private void failBackup(BackupOperation operation) {
inFlightUpdates.remove(operation);
}
+ /**
+ * Succeeds the given backup operation.
+ * <p>
+ * The last backup time for the operation will be updated and the operation will be removed from
+ * in-flight updates.
+ *
+ * @param operation the operation to succeed
+ * @param timestamp the timestamp at which the operation was <em>started</em>
+ */
private void succeedBackup(BackupOperation operation, long timestamp) {
- inFlightUpdates.remove(operation);
lastBackupTimes.put(operation, timestamp);
+ inFlightUpdates.remove(operation);
}
+ /**
+ * Performs the given backup operation.
+ *
+ * @param operation the operation to perform
+ * @return a future to be completed with a boolean indicating whether the backup operation was successful
+ */
private CompletableFuture<Boolean> backup(BackupOperation operation) {
log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.",
operation.bucketId().bucket(), operation.bucketId().deviceId(), operation.nodeId());
@@ -1010,10 +1058,18 @@
}
future.complete(backedupFlows != null);
});
+
+ updateFlowCounts(flowBucket);
return future;
}
- private Set<FlowId> onBackupReceipt(FlowBucket flowBucket) {
+ /**
+ * Handles a flow bucket backup from a remote peer.
+ *
+ * @param flowBucket the flow bucket to back up
+ * @return the set of flows that could not be backed up
+ */
+ private Set<FlowId> onBackup(FlowBucket flowBucket) {
log.debug("Received flowEntries for {} bucket {} to backup",
flowBucket.bucketId().deviceId(), flowBucket.bucketId);
Set<FlowId> backedupFlows = Sets.newHashSet();
@@ -1036,6 +1092,32 @@
}
return backedupFlows;
}
+
+ /**
+ * Updates all flow counts for the given device.
+ *
+ * @param deviceId the device for which to update flow counts
+ */
+ private void updateDeviceFlowCounts(DeviceId deviceId) {
+ for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+ BucketId bucketId = new BucketId(deviceId, bucket);
+ FlowBucket flowBucket = getFlowBucket(bucketId);
+ updateFlowCounts(flowBucket);
+ }
+ }
+
+ /**
+ * Updates the eventually consistent flow count for the given bucket.
+ *
+ * @param flowBucket the flow bucket for which to update flow counts
+ */
+ private void updateFlowCounts(FlowBucket flowBucket) {
+ int flowCount = flowBucket.table().entrySet()
+ .stream()
+ .mapToInt(e -> e.getValue().values().size())
+ .sum();
+ flowCounts.put(flowBucket.bucketId(), flowCount);
+ }
}
@Override