ONOS-1883: Fix for lost flow rules on CLI directed mastership changes.
- Made all mastership role change operations asynchronous, which they are.
- In flowrule store we now check to see if any new backups need to be made when a device backup location (standby) changes
- In device mastership store we now wait briefly before we step down from mastership after promoting a new candidate as highest standy
Change-Id: Icb76cf4d0d23403053a3fd5a458a940b847da49f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index e042982..8bfab64 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -62,6 +62,8 @@
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfo;
+import org.onosproject.store.flow.ReplicaInfoEvent;
+import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
@@ -181,6 +183,7 @@
registerMessageHandlers(messageHandlingExecutor);
if (backupEnabled) {
+ replicaInfoManager.addListener(flowTable);
backupTask = backupSenderExecutor.scheduleWithFixedDelay(
flowTable::backup,
0,
@@ -193,6 +196,10 @@
@Deactivate
public void deactivate(ComponentContext context) {
+ if (backupEnabled) {
+ replicaInfoManager.removeListener(flowTable);
+ backupTask.cancel(true);
+ }
configService.unregisterProperties(getClass(), false);
unregisterMessageHandlers();
messageHandlingExecutor.shutdownNow();
@@ -232,9 +239,14 @@
boolean restartBackupTask = false;
if (newBackupEnabled != backupEnabled) {
backupEnabled = newBackupEnabled;
- if (!backupEnabled && backupTask != null) {
- backupTask.cancel(false);
- backupTask = null;
+ if (!backupEnabled) {
+ replicaInfoManager.removeListener(flowTable);
+ if (backupTask != null) {
+ backupTask.cancel(false);
+ backupTask = null;
+ }
+ } else {
+ replicaInfoManager.addListener(flowTable);
}
restartBackupTask = backupEnabled;
}
@@ -590,7 +602,7 @@
}
}
- private class InternalFlowTable {
+ private class InternalFlowTable implements ReplicaInfoEventListener {
private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
flowEntries = new ConcurrentHashMap<>();
@@ -603,6 +615,43 @@
return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
}
+ @Override
+ public void event(ReplicaInfoEvent event) {
+ if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
+ DeviceId deviceId = event.subject();
+ if (!Objects.equal(local, replicaInfoManager.getReplicaInfoFor(deviceId).master())) {
+ // ignore since this event is for a device this node does not manage.
+ return;
+ }
+ NodeId latestBackupNode = getBackupNode(deviceId);
+ NodeId existingBackupNode = lastBackupNodes.get(deviceId);
+ if (Objects.equal(latestBackupNode, existingBackupNode)) {
+ // ignore since backup location hasn't changed.
+ return;
+ }
+ backupFlowEntries(latestBackupNode, Sets.newHashSet(deviceId));
+ }
+ }
+
+ private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
+ log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
+ Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
+ Maps.newConcurrentMap();
+ flowEntries.forEach((key, value) -> {
+ if (deviceIds.contains(key)) {
+ deviceFlowEntries.put(key, value);
+ }
+ });
+ clusterCommunicator.unicast(deviceFlowEntries,
+ FLOW_TABLE_BACKUP,
+ SERIALIZER::encode,
+ nodeId);
+ deviceIds.forEach(id -> {
+ lastBackupTimes.put(id, System.currentTimeMillis());
+ lastBackupNodes.put(id, nodeId);
+ });
+ }
+
/**
* Returns the flow table for specified device.
*
@@ -662,7 +711,6 @@
if (!backupEnabled) {
return;
}
- //TODO: Force backup when backups change.
try {
// determine the set of devices that we need to backup during this run.
Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
@@ -686,35 +734,15 @@
.add(deviceId);
}
});
-
// send the device flow entries to their respective backup nodes
- devicesToBackupByNode.forEach((nodeId, deviceIds) -> {
- Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
- Maps.newConcurrentMap();
- flowEntries.forEach((key, value) -> {
- if (deviceIds.contains(key)) {
- deviceFlowEntries.put(key, value);
- }
- });
- clusterCommunicator.unicast(deviceFlowEntries,
- FLOW_TABLE_BACKUP,
- SERIALIZER::encode,
- nodeId);
- });
-
- // update state for use in subsequent run.
- devicesToBackupByNode.forEach((node, devices) -> {
- devices.forEach(id -> {
- lastBackupTimes.put(id, System.currentTimeMillis());
- lastBackupNodes.put(id, node);
- });
- });
+ devicesToBackupByNode.forEach(this::backupFlowEntries);
} catch (Exception e) {
log.error("Backup failed.", e);
}
}
private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
+ log.debug("Received flows for {} to backup", flowTables.keySet());
Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
// Only process those devices are that not managed by the local node.
Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))