Ensure backups are updated on change in DeviceFlowTable
Change-Id: I5f21879e84e6093839cf3603eba352a5ced267f2
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index 6bc5b5f..e60a552 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -688,6 +688,9 @@
case TERM_ACTIVE:
activateTerm(event.subject());
break;
+ case TERM_UPDATE:
+ updateTerm(event.subject());
+ break;
default:
break;
}
@@ -724,6 +727,23 @@
}
/**
+ * Handles an update to a term.
+ */
+ private void updateTerm(DeviceReplicaInfo replicaInfo) {
+ if (replicaInfo.term() == this.replicaInfo.term()) {
+ this.replicaInfo = replicaInfo;
+
+ // If the local node is neither the master or a backup for the device *and the term is active*,
+ // clear the flow table.
+ if (activeTerm == replicaInfo.term()
+ && !replicaInfo.isMaster(localNodeId)
+ && !replicaInfo.isBackup(localNodeId)) {
+ flowBuckets.values().forEach(bucket -> bucket.clear());
+ }
+ }
+ }
+
+ /**
* Sends a message to the given node wrapped in a Lamport timestamp.
* <p>
* Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 19bfd48..90c503b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -911,7 +911,7 @@
@Override
public void event(ReplicaInfoEvent event) {
- if (event.subject().equals(deviceId) && event.type() == ReplicaInfoEvent.Type.MASTER_CHANGED) {
+ if (event.subject().equals(deviceId)) {
onReplicaInfoChange(event.replicaInfo());
}
}
@@ -953,6 +953,8 @@
listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
}
listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
+ } else if (oldReplicaInfo.term() == replicaInfo.term()) {
+ listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java
index 29672d0..ab6262f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java
@@ -28,6 +28,7 @@
public enum Type {
TERM_START,
TERM_ACTIVE,
+ TERM_UPDATE,
TERM_END,
}