FlowRuleStore: Consider errors when updating state of all current backups
Change-Id: I3bf4d20d79dc37c7040648ec6379794b8c93aad2
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index 3b82655..888f95b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -290,7 +290,7 @@
clusterCommunicator.addSubscriber(
REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
clusterCommunicator.addSubscriber(
- FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, executor);
+ FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
}
private void unregisterMessageHandlers() {
@@ -644,21 +644,33 @@
private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
- Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
+ Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
Maps.newConcurrentMap();
flowEntries.forEach((key, value) -> {
if (deviceIds.contains(key)) {
deviceFlowEntries.put(key, value);
}
});
- clusterCommunicator.unicast(deviceFlowEntries,
- FLOW_TABLE_BACKUP,
- SERIALIZER::encode,
- nodeId);
- deviceIds.forEach(id -> {
- lastBackupTimes.put(id, System.currentTimeMillis());
- lastBackupNodes.put(id, nodeId);
- });
+ clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
+ deviceFlowEntries,
+ FLOW_TABLE_BACKUP,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ nodeId)
+ .whenComplete((backedupDevices, error) -> {
+ Set<DeviceId> devicesNotBackedup = error != null ?
+ deviceFlowEntries.keySet() :
+ Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
+ if (devicesNotBackedup.size() > 0) {
+ log.warn("Failed to backup devices: {}", devicesNotBackedup, error);
+ }
+ if (backedupDevices != null) {
+ backedupDevices.forEach(id -> {
+ lastBackupTimes.put(id, System.currentTimeMillis());
+ lastBackupNodes.put(id, nodeId);
+ });
+ }
+ });
}
/**
@@ -751,16 +763,23 @@
}
}
- private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
+ private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
log.debug("Received flowEntries for {} to backup", flowTables.keySet());
- Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
- // Only process those devices are that not managed by the local node.
- Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
+ Set<DeviceId> backedupDevices = Sets.newHashSet();
+ try {
+ Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
+ // Only process those devices are that not managed by the local node.
+ Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
.forEach((deviceId, flowTable) -> {
Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
deviceFlowTable.clear();
deviceFlowTable.putAll(flowTable);
+ backedupDevices.add(deviceId);
});
+ } catch (Exception e) {
+ log.warn("Failure processing backup request", e);
+ }
+ return backedupDevices;
}
}
}