[ONOS-7379] Use atomic operations for flow table updates to avoid losing flows during updates
Change-Id: If15361f454b57cf551236674f757b8b7b08ec238
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index b50adf8..108f380 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -486,19 +486,18 @@
StoredFlowEntry entry;
switch (op.operator()) {
case ADD:
+ entry = new DefaultFlowEntry(op.target());
+ flowTable.add(entry);
+ return op;
case MODIFY:
entry = new DefaultFlowEntry(op.target());
- // always add requested FlowRule
- // Note: 2 equal FlowEntry may have different treatment
- flowTable.remove(entry.deviceId(), entry);
- flowTable.add(entry);
-
+ flowTable.update(entry);
return op;
case REMOVE:
entry = flowTable.getFlowEntry(op.target());
if (entry != null) {
- //FIXME modification of "stored" flow entry outside of flow table
entry.setState(FlowEntryState.PENDING_REMOVE);
+ flowTable.update(entry);
log.debug("Setting state of rule to pending remove: {}", entry);
return op;
}
@@ -550,7 +549,6 @@
// check if this new rule is an update to an existing entry
StoredFlowEntry stored = flowTable.getFlowEntry(rule);
if (stored != null) {
- //FIXME modification of "stored" flow entry outside of flow table
stored.setBytes(rule.bytes());
stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
stored.setLiveType(rule.liveType());
@@ -558,6 +556,8 @@
stored.setLastSeen();
if (stored.state() == FlowEntryState.PENDING_ADD) {
stored.setState(FlowEntryState.ADDED);
+ // Update the flow table to ensure the changes are replicated
+ flowTable.update(stored);
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
@@ -597,9 +597,8 @@
}
private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
- final DeviceId deviceId = rule.deviceId();
// This is where one could mark a rule as removed and still keep it in the store.
- final FlowEntry removed = flowTable.remove(deviceId, rule);
+ final FlowEntry removed = flowTable.remove(rule);
// rule may be partial rule that is missing treatment, we should use rule from store instead
return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
}
@@ -838,18 +837,36 @@
}
public void add(FlowEntry rule) {
- getFlowEntriesInternal(rule.deviceId(), rule.id())
- .compute((StoredFlowEntry) rule, (k, stored) -> {
- //TODO compare stored and rule timestamps
- //TODO the key is not updated
- return (StoredFlowEntry) rule;
- });
- lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+ StoredFlowEntry stored = getFlowEntriesInternal(rule.deviceId(), rule.id())
+ .putIfAbsent((StoredFlowEntry) rule, (StoredFlowEntry) rule);
+ if (stored == null) {
+ lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+ }
}
- public FlowEntry remove(DeviceId deviceId, FlowEntry rule) {
+ public void update(FlowEntry rule) {
+ getFlowEntriesInternal(rule.deviceId(), rule.id())
+ .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
+ if (rule instanceof DefaultFlowEntry) {
+ DefaultFlowEntry updated = (DefaultFlowEntry) rule;
+ if (stored instanceof DefaultFlowEntry) {
+ DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
+ if (updated.created() >= storedEntry.created()) {
+ lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+ return updated;
+ } else {
+ log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
+ return stored;
+ }
+ }
+ }
+ return stored;
+ });
+ }
+
+ public FlowEntry remove(FlowEntry rule) {
final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
- final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(deviceId);
+ final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(rule.deviceId());
flowTable.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
if (rule instanceof DefaultFlowEntry) {
@@ -857,8 +874,7 @@
if (stored instanceof DefaultFlowEntry) {
DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
if (toRemove.created() < storedEntry.created()) {
- log.debug("Trying to remove more recent flow entry {} (stored: {})",
- toRemove, stored);
+ log.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
// the key is not updated, removedRule remains null
return stored;
}
@@ -871,7 +887,7 @@
});
if (removedRule.get() != null) {
- lastUpdateTimes.put(deviceId, System.currentTimeMillis());
+ lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
return removedRule.get();
} else {
return null;