[CORD-3119] Fix missing flows on node restart
Change-Id: I51cf3ee0682873beb7f9334dac1e77ed20022bfb
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 3080615..8efb40a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -181,14 +181,16 @@
protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
+ .register(BucketId.class)
.register(FlowBucket.class)
.build());
protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(MastershipBasedTimestamp.class);
+ .register(KryoNamespaces.API)
+ .register(BucketId.class)
+ .register(MastershipBasedTimestamp.class);
- private EventuallyConsistentMap<DeviceId, Map<Integer, Integer>> flowCounts;
+ private EventuallyConsistentMap<BucketId, Integer> flowCounts;
private IdGenerator idGenerator;
private NodeId local;
@@ -215,7 +217,7 @@
backupPeriod,
TimeUnit.MILLISECONDS);
- flowCounts = storageService.<DeviceId, Map<Integer, Integer>>eventuallyConsistentMapBuilder()
+ flowCounts = storageService.<BucketId, Integer>eventuallyConsistentMapBuilder()
.withName("onos-flow-counts")
.withSerializer(serializerBuilder)
.withAntiEntropyPeriod(5, TimeUnit.SECONDS)
@@ -338,9 +340,6 @@
prefix, msgHandlerPoolSize, backupPeriod, backupCount);
}
- // This is not a efficient operation on a distributed sharded
- // flow store. We need to revisit the need for this operation or at least
- // make it device specific.
@Override
public int getFlowRuleCount() {
return Streams.stream(deviceService.getDevices()).parallel()
@@ -350,11 +349,10 @@
@Override
public int getFlowRuleCount(DeviceId deviceId) {
- Map<Integer, Integer> counts = flowCounts.get(deviceId);
- return counts != null
- ? counts.values().stream().mapToInt(v -> v).sum()
- : flowTable.flowEntries.get(deviceId) != null
- ? flowTable.flowEntries.get(deviceId).keySet().size() : 0;
+ return flowCounts.entrySet().stream()
+ .filter(entry -> entry.getKey().deviceId().equals(deviceId))
+ .mapToInt(entry -> entry.getValue())
+ .sum();
}
@Override
@@ -658,43 +656,95 @@
}
}
+ /**
+ * Represents a backup to a of a distinct bucket to a distinct node.
+ */
private class BackupOperation {
private final NodeId nodeId;
- private final DeviceId deviceId;
- private final int bucket;
+ private final BucketId bucketId;
- public BackupOperation(NodeId nodeId, DeviceId deviceId, int bucket) {
+ BackupOperation(NodeId nodeId, BucketId bucketId) {
this.nodeId = nodeId;
- this.deviceId = deviceId;
- this.bucket = bucket;
+ this.bucketId = bucketId;
+ }
+
+ NodeId nodeId() {
+ return nodeId;
+ }
+
+ BucketId bucketId() {
+ return bucketId;
}
@Override
public int hashCode() {
- return Objects.hash(nodeId, deviceId, bucket);
+ return Objects.hash(nodeId, bucketId);
}
@Override
public boolean equals(Object other) {
if (other != null && other instanceof BackupOperation) {
BackupOperation that = (BackupOperation) other;
- return this.nodeId.equals(that.nodeId) &&
- this.deviceId.equals(that.deviceId) &&
- this.bucket == that.bucket;
- } else {
- return false;
+ return this.nodeId.equals(that.nodeId)
+ && this.bucketId.equals(that.bucketId);
}
+ return false;
}
}
- private class FlowBucket {
+ /**
+ * Represents a distinct device flow bucket.
+ */
+ private class BucketId {
private final DeviceId deviceId;
private final int bucket;
- private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
- FlowBucket(DeviceId deviceId, int bucket, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table) {
+ BucketId(DeviceId deviceId, int bucket) {
this.deviceId = deviceId;
this.bucket = bucket;
+ }
+
+ DeviceId deviceId() {
+ return deviceId;
+ }
+
+ int bucket() {
+ return bucket;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceId, bucket);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other != null && other instanceof BucketId) {
+ BucketId that = (BucketId) other;
+ return this.deviceId.equals(that.deviceId)
+ && this.bucket == that.bucket;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Container for flows in a specific bucket.
+ */
+ private class FlowBucket {
+ private final BucketId bucketId;
+ private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
+
+ BucketId bucketId() {
+ return bucketId;
+ }
+
+ Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table() {
+ return table;
+ }
+
+ FlowBucket(BucketId bucketId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table) {
+ this.bucketId = bucketId;
this.table = table;
}
}
@@ -706,8 +756,8 @@
flowEntries = Maps.newConcurrentMap();
private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
- private final Map<DeviceId, Map<Integer, Long>> lastUpdateTimes = Maps.newConcurrentMap();
- private final Map<NodeId, Set<DeviceId>> inFlightUpdates = Maps.newConcurrentMap();
+ private final Map<BucketId, Long> lastUpdateTimes = Maps.newConcurrentMap();
+ private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
@Override
public void event(ReplicaInfoEvent event) {
@@ -721,49 +771,12 @@
}
if (event.type() == MASTER_CHANGED) {
for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
- recordUpdate(deviceId, bucket);
+ recordUpdate(new BucketId(deviceId, bucket));
}
}
backupSenderExecutor.execute(this::backup);
}
- private CompletableFuture<Void> backupFlowEntries(
- NodeId nodeId, DeviceId deviceId, int bucket, long timestamp) {
- log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.", bucket, deviceId, nodeId);
- FlowBucket flowBucket = getFlowBucket(deviceId, bucket);
- int flowCount = flowBucket.table.entrySet().stream()
- .mapToInt(e -> e.getValue().values().size()).sum();
- flowCounts.compute(deviceId, (id, counts) -> {
- if (counts == null) {
- counts = Maps.newConcurrentMap();
- }
- counts.put(bucket, flowCount);
- return counts;
- });
-
- CompletableFuture<Void> future = new CompletableFuture<>();
- clusterCommunicator.<FlowBucket, Set<FlowId>>
- sendAndReceive(flowBucket,
- FLOW_TABLE_BACKUP,
- serializer::encode,
- serializer::decode,
- nodeId)
- .whenComplete((backedupFlows, error) -> {
- Set<FlowId> flowsNotBackedUp = error != null ?
- flowBucket.table.keySet() :
- Sets.difference(flowBucket.table.keySet(), backedupFlows);
- if (flowsNotBackedUp.size() > 0) {
- log.warn("Failed to backup flows: {}. Reason: {}, Node: {}",
- flowsNotBackedUp, error != null ? error.getMessage() : "none", nodeId);
- }
- if (backedupFlows != null) {
- lastBackupTimes.put(new BackupOperation(nodeId, deviceId, bucket), timestamp);
- }
- future.complete(null);
- });
- return future;
- }
-
/**
* Returns the flow table for specified device.
*
@@ -775,49 +788,34 @@
return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
.<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
.withName("FlowTable:" + deviceId.toString())
- .withSerializer(new Serializer() {
- @Override
- public <T> byte[] encode(T object) {
- return serializer.encode(object);
- }
-
- @Override
- public <T> T decode(byte[] bytes) {
- return serializer.decode(bytes);
- }
-
- @Override
- public <T> T copy(T object) {
- return serializer.copy(object);
- }
- })
+ .withSerializer(serializer)
.build());
} else {
return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
}
}
- private FlowBucket getFlowBucket(DeviceId deviceId, int bucket) {
+ private FlowBucket getFlowBucket(BucketId bucketId) {
if (persistenceEnabled) {
- return new FlowBucket(deviceId, bucket, flowEntries.computeIfAbsent(deviceId, id ->
+ return new FlowBucket(bucketId, flowEntries.computeIfAbsent(bucketId.deviceId(), id ->
persistenceService.<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
- .withName("FlowTable:" + deviceId.toString())
+ .withName("FlowTable:" + bucketId.deviceId().toString())
.withSerializer(serializer)
.build())
.entrySet()
.stream()
- .filter(entry -> isBucket(entry.getKey(), bucket))
+ .filter(entry -> isInBucket(entry.getKey(), bucketId.bucket()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
} else {
Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
- flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
+ flowEntries.computeIfAbsent(bucketId.deviceId(), id -> Maps.newConcurrentMap())
.entrySet()
.stream()
- .filter(entry -> isBucket(entry.getKey(), bucket))
+ .filter(entry -> isInBucket(entry.getKey(), bucketId.bucket()))
.forEach(entry -> {
copy.put(entry.getKey(), Maps.newHashMap(entry.getValue()));
});
- return new FlowBucket(deviceId, bucket, copy);
+ return new FlowBucket(bucketId, copy);
}
}
@@ -843,7 +841,7 @@
return getFlowEntriesInternal(deviceId);
}
- private boolean isBucket(FlowId flowId, int bucket) {
+ private boolean isInBucket(FlowId flowId, int bucket) {
return bucket(flowId) == bucket;
}
@@ -851,9 +849,14 @@
return (int) (flowId.id() % NUM_BUCKETS);
}
- private void recordUpdate(DeviceId deviceId, int bucket) {
- lastUpdateTimes.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
- .put(bucket, System.currentTimeMillis());
+ private void recordUpdate(BucketId bucketId) {
+ lastUpdateTimes.put(bucketId, System.currentTimeMillis());
+ FlowBucket flowBucket = getFlowBucket(bucketId);
+ int flowCount = flowBucket.table().entrySet()
+ .stream()
+ .mapToInt(e -> e.getValue().values().size())
+ .sum();
+ flowCounts.put(bucketId, flowCount);
}
public void add(FlowEntry rule) {
@@ -861,7 +864,7 @@
.compute((StoredFlowEntry) rule, (k, stored) -> {
return (StoredFlowEntry) rule;
});
- recordUpdate(rule.deviceId(), bucket(rule.id()));
+ recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
}
public void update(FlowEntry rule) {
@@ -872,7 +875,7 @@
if (stored instanceof DefaultFlowEntry) {
DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
if (updated.created() >= storedEntry.created()) {
- recordUpdate(rule.deviceId(), bucket(rule.id()));
+ recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
return updated;
} else {
log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
@@ -907,7 +910,7 @@
});
if (removedRule.get() != null) {
- recordUpdate(rule.deviceId(), bucket(rule.id()));
+ recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
return removedRule.get();
} else {
return null;
@@ -955,41 +958,78 @@
private void backup(NodeId nodeId, DeviceId deviceId) {
final long timestamp = System.currentTimeMillis();
for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
- long lastBackupTime = lastBackupTimes.getOrDefault(new BackupOperation(nodeId, deviceId, bucket), 0L);
- long lastUpdateTime = lastUpdateTimes.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
- .getOrDefault(bucket, 0L);
- if (lastBackupTime < lastUpdateTime && startBackup(nodeId, deviceId)) {
- backupFlowEntries(nodeId, deviceId, bucket, timestamp)
- .thenRunAsync(() -> {
- finishBackup(nodeId, deviceId);
- backup(nodeId);
- }, backupSenderExecutor);
- return;
+ BucketId bucketId = new BucketId(deviceId, bucket);
+ BackupOperation operation = new BackupOperation(nodeId, bucketId);
+ if (startBackup(operation)) {
+ backup(operation).whenCompleteAsync((succeeded, error) -> {
+ if (error == null && succeeded) {
+ succeedBackup(operation, timestamp);
+ } else {
+ failBackup(operation);
+ }
+ backup(nodeId);
+ }, backupSenderExecutor);
}
}
}
- private boolean startBackup(NodeId nodeId, DeviceId deviceId) {
- return inFlightUpdates.computeIfAbsent(nodeId, id -> Sets.newConcurrentHashSet()).add(deviceId);
+ private boolean startBackup(BackupOperation operation) {
+ long lastBackupTime = lastBackupTimes.getOrDefault(operation, 0L);
+ long lastUpdateTime = lastUpdateTimes.getOrDefault(operation.bucketId(), 0L);
+ return lastUpdateTime > 0 && lastBackupTime <= lastUpdateTime && inFlightUpdates.add(operation);
}
- private void finishBackup(NodeId nodeId, DeviceId deviceId) {
- inFlightUpdates.computeIfAbsent(nodeId, id -> Sets.newConcurrentHashSet()).remove(deviceId);
+ private void failBackup(BackupOperation operation) {
+ inFlightUpdates.remove(operation);
}
- private Set<FlowId> onBackupReceipt(FlowBucket bucket) {
- log.debug("Received flowEntries for {} bucket {} to backup", bucket.deviceId, bucket.bucket);
+ private void succeedBackup(BackupOperation operation, long timestamp) {
+ inFlightUpdates.remove(operation);
+ lastBackupTimes.put(operation, timestamp);
+ }
+
+ private CompletableFuture<Boolean> backup(BackupOperation operation) {
+ log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.",
+ operation.bucketId().bucket(), operation.bucketId().deviceId(), operation.nodeId());
+ FlowBucket flowBucket = getFlowBucket(operation.bucketId());
+
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ clusterCommunicator.<FlowBucket, Set<FlowId>>
+ sendAndReceive(flowBucket,
+ FLOW_TABLE_BACKUP,
+ serializer::encode,
+ serializer::decode,
+ operation.nodeId())
+ .whenComplete((backedupFlows, error) -> {
+ Set<FlowId> flowsNotBackedUp = error != null ?
+ flowBucket.table().keySet() :
+ Sets.difference(flowBucket.table().keySet(), backedupFlows);
+ if (flowsNotBackedUp.size() > 0) {
+ log.warn("Failed to backup flows: {}. Reason: {}, Node: {}",
+ flowsNotBackedUp, error != null ? error.getMessage() : "none", operation.nodeId());
+ }
+ future.complete(backedupFlows != null);
+ });
+ return future;
+ }
+
+ private Set<FlowId> onBackupReceipt(FlowBucket flowBucket) {
+ log.debug("Received flowEntries for {} bucket {} to backup",
+ flowBucket.bucketId().deviceId(), flowBucket.bucketId);
Set<FlowId> backedupFlows = Sets.newHashSet();
try {
// Only process those devices are that not managed by the local node.
- NodeId master = replicaInfoManager.getReplicaInfoFor(bucket.deviceId).master().orElse(null);
+ NodeId master = replicaInfoManager.getReplicaInfoFor(flowBucket.bucketId().deviceId())
+ .master()
+ .orElse(null);
if (!Objects.equals(local, master)) {
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable = getFlowTable(bucket.deviceId);
- backupFlowTable.putAll(bucket.table);
+ Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
+ getFlowTable(flowBucket.bucketId().deviceId());
+ backupFlowTable.putAll(flowBucket.table());
backupFlowTable.entrySet()
- .removeIf(entry -> isBucket(entry.getKey(), bucket.bucket)
- && !bucket.table.containsKey(entry.getKey()));
- backedupFlows.addAll(bucket.table.keySet());
+ .removeIf(entry -> isInBucket(entry.getKey(), flowBucket.bucketId().bucket())
+ && !flowBucket.table().containsKey(entry.getKey()));
+ backedupFlows.addAll(flowBucket.table().keySet());
}
} catch (Exception e) {
log.warn("Failure processing backup request", e);