Fix inconsistent update type during concurrent P4Runtime writes
This is achieved by optimistically updating the P4Runtime mirror using
the write request (instead of waiting for a response) and by serializing
building write requests for the same device.
This change requires updating the P4Runtime protocol classes to expose
the content of the write request.
It also includes:
- force member weight to 1 when reading groups (some server
implementation still fails to be compliant to the spec)
- remove unused operation timeout handling in GDP (now all RPCz have a
timeout)
Change-Id: Ib4f99a6085c1283f46a2797e0c883d96954e02e9
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
index fca05cb..01af5eb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
@@ -18,7 +18,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.onlab.util.SharedExecutors;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Striped;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileGroupMirror;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileMemberMirror;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMirror;
@@ -43,7 +44,8 @@
import org.onosproject.net.pi.service.PiTranslatedEntity;
import org.onosproject.net.pi.service.PiTranslationException;
import org.onosproject.p4runtime.api.P4RuntimeReadClient;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
import java.util.Collection;
import java.util.Collections;
@@ -52,6 +54,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toMap;
@@ -70,6 +74,10 @@
private static final String READ_ACTION_GROUPS_FROM_MIRROR = "actionGroupReadFromMirror";
private static final boolean DEFAULT_READ_ACTION_GROUPS_FROM_MIRROR = false;
+ // Used to make sure concurrent calls to write groups are serialized so
+ // that each request gets consistent access to mirror state.
+ private static final Striped<Lock> WRITE_LOCKS = Striped.lock(30);
+
protected GroupStore groupStore;
private P4RuntimeActionProfileGroupMirror groupMirror;
private P4RuntimeActionProfileMemberMirror memberMirror;
@@ -180,11 +188,23 @@
log.warn("Cleaning up {} action profile groups and " +
"{} members on {}...",
groupHandlesToRemove.size(), memberHandlesToRemove.size(), deviceId);
- SharedExecutors.getSingleThreadExecutor().execute(
- () -> submitWriteRequestAndUpdateMirror(
- client.write(pipeconf)
- .delete(groupHandlesToRemove)
- .delete(memberHandlesToRemove)));
+ final WriteRequest deleteRequest = client.write(pipeconf)
+ .delete(groupHandlesToRemove)
+ .delete(memberHandlesToRemove);
+ WRITE_LOCKS.get(deviceId).lock();
+ try {
+ updateMirrorAndSubmit(deleteRequest).whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Exception removing inconsistent group/members", ex);
+ } else {
+ log.debug("Completed removal of inconsistent " +
+ "groups/members ({} of {} updates succeeded)",
+ r.success(), r.all());
+ }
+ });
+ } finally {
+ WRITE_LOCKS.get(deviceId).unlock();
+ }
}
// Done.
@@ -294,30 +314,34 @@
if (members == null) {
return false;
}
- final P4RuntimeWriteClient.WriteRequest request = client.write(pipeconf);
- // FIXME: when operation is remove, should we remove members first? Same
- // thing when modifying a group, should we first modify the group then
- // remove the member?
- final boolean allMembersSkipped = members.stream()
- .allMatch(m -> appendEntityToWriteRequestOrSkip(
- request, m.handle(deviceId), m, memberMirror, operation));
- final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
- request, groupHandle, group, groupMirror, operation);
- if (allMembersSkipped && groupSkipped) {
- return true;
- } else {
- // True if all entities in the request (groups and members) where
- // written successfully.
- return submitWriteRequestAndUpdateMirror(request).isSuccess();
+ final WriteRequest request = client.write(pipeconf);
+ final CompletableFuture<WriteResponse> futureResponse;
+ WRITE_LOCKS.get(deviceId).lock();
+ try {
+ // FIXME: when operation is remove, should we remove members first? Same
+ // thing when modifying a group, should we first modify the group then
+ // remove the member?
+ final boolean allMembersSkipped = members.stream()
+ .allMatch(m -> appendEntityToWriteRequestOrSkip(
+ request, m.handle(deviceId), m, memberMirror, operation));
+ final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
+ request, groupHandle, group, groupMirror, operation);
+ if (allMembersSkipped && groupSkipped) {
+ return true;
+ } else {
+ futureResponse = updateMirrorAndSubmit(request);
+ }
+ } finally {
+ WRITE_LOCKS.get(deviceId).unlock();
}
+ // Wait response from device. Returns true if all entities in the
+ // request (groups and members) were written successfully.
+ return Futures.getUnchecked(futureResponse).isSuccess();
}
private <H extends PiHandle, E extends PiEntity> boolean appendEntityToWriteRequestOrSkip(
- P4RuntimeWriteClient.WriteRequest writeRequest,
- H handle,
- E entityToApply,
- P4RuntimeMirror<H, E> mirror,
- Operation operation) {
+ WriteRequest writeRequest, H handle, E entityToApply,
+ P4RuntimeMirror<H, E> mirror, Operation operation) {
// Should return true if there's no need to write entity on device,
// false if the write request is modified or an error occurs.
final TimedEntry<E> entityOnDevice = mirror.get(handle);
@@ -362,12 +386,10 @@
return instances;
}
- private P4RuntimeWriteClient.WriteResponse submitWriteRequestAndUpdateMirror(
- P4RuntimeWriteClient.WriteRequest request) {
- final P4RuntimeWriteClient.WriteResponse response = request.submitSync();
- groupMirror.replayWriteResponse(response);
- memberMirror.replayWriteResponse(response);
- return response;
+ private CompletableFuture<WriteResponse> updateMirrorAndSubmit(WriteRequest request) {
+ groupMirror.applyWriteRequest(request);
+ memberMirror.applyWriteRequest(request);
+ return request.submit();
}
enum Operation {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index f905c4c..8a1f99a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -19,7 +19,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.onlab.util.SharedExecutors;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Striped;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
import org.onosproject.net.flow.DefaultFlowEntry;
@@ -42,8 +43,9 @@
import org.onosproject.net.pi.service.PiTranslatedEntity;
import org.onosproject.net.pi.service.PiTranslationException;
import org.onosproject.p4runtime.api.P4RuntimeReadClient;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
import java.util.Collection;
import java.util.Collections;
@@ -52,6 +54,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import static org.onosproject.drivers.p4runtime.P4RuntimeFlowRuleProgrammable.Operation.APPLY;
@@ -97,6 +101,10 @@
private static final String TABLE_DEFAULT_AS_ENTRY = "tableDefaultAsEntry";
private static final boolean DEFAULT_TABLE_DEFAULT_AS_ENTRY = false;
+ // Used to make sure concurrent calls to write flow rules are serialized so
+ // that each request gets consistent access to mirror state.
+ private static final Striped<Lock> WRITE_LOCKS = Striped.lock(30);
+
private PiPipelineModel pipelineModel;
private P4RuntimeTableMirror tableMirror;
private PiFlowRuleTranslator translator;
@@ -165,8 +173,25 @@
if (!inconsistentEntries.isEmpty()) {
// Trigger clean up of inconsistent entries.
- SharedExecutors.getSingleThreadExecutor().execute(
- () -> cleanUpInconsistentEntries(inconsistentEntries));
+ log.warn("Found {} inconsistent table entries on {}, removing them...",
+ inconsistentEntries.size(), deviceId);
+ final WriteRequest request = client.write(pipeconf)
+ .entities(inconsistentEntries, DELETE);
+ WRITE_LOCKS.get(deviceId).lock();
+ // Update mirror and async submit delete request.
+ try {
+ tableMirror.applyWriteRequest(request);
+ request.submit().whenComplete((response, ex) -> {
+ if (ex != null) {
+ log.error("Exception removing inconsistent table entries", ex);
+ } else {
+ log.debug("Successfully removed {} out of {} inconsistent entries",
+ response.success().size(), response.all().size());
+ }
+ });
+ } finally {
+ WRITE_LOCKS.get(deviceId).unlock();
+ }
}
return result.build();
@@ -246,48 +271,48 @@
.collect(Collectors.toList());
}
- private void cleanUpInconsistentEntries(Collection<PiTableEntry> piEntries) {
- log.warn("Found {} inconsistent table entries on {}, removing them...",
- piEntries.size(), deviceId);
- // Remove entries and update mirror.
- tableMirror.replayWriteResponse(
- client.write(pipeconf).entities(piEntries, DELETE).submitSync());
- }
-
private Collection<FlowRule> processFlowRules(Collection<FlowRule> rules,
Operation driverOperation) {
if (!setupBehaviour() || rules.isEmpty()) {
return Collections.emptyList();
}
// Created batched write request.
- final P4RuntimeWriteClient.WriteRequest request = client.write(pipeconf);
+ final WriteRequest request = client.write(pipeconf);
// For each rule, translate to PI and append to write request.
final Map<PiHandle, FlowRule> handleToRuleMap = Maps.newHashMap();
final List<FlowRule> skippedRules = Lists.newArrayList();
- for (FlowRule rule : rules) {
- final PiTableEntry entry;
- try {
- entry = translator.translate(rule, pipeconf);
- } catch (PiTranslationException e) {
- log.warn("Unable to translate flow rule for pipeconf '{}': {} [{}]",
- pipeconf.id(), e.getMessage(), rule);
- // Next rule.
- continue;
+ final CompletableFuture<WriteResponse> futureResponse;
+ WRITE_LOCKS.get(deviceId).lock();
+ try {
+ for (FlowRule rule : rules) {
+ final PiTableEntry entry;
+ try {
+ entry = translator.translate(rule, pipeconf);
+ } catch (PiTranslationException e) {
+ log.warn("Unable to translate flow rule for pipeconf '{}': {} [{}]",
+ pipeconf.id(), e.getMessage(), rule);
+ // Next rule.
+ continue;
+ }
+ final PiTableEntryHandle handle = entry.handle(deviceId);
+ handleToRuleMap.put(handle, rule);
+ // Append entry to batched write request (returns false), or skip (true)
+ if (appendEntryToWriteRequestOrSkip(
+ request, handle, entry, driverOperation)) {
+ skippedRules.add(rule);
+ updateTranslationStore(
+ driverOperation, handle, rule, entry);
+ }
}
- final PiTableEntryHandle handle = entry.handle(deviceId);
- handleToRuleMap.put(handle, rule);
- // Append entry to batched write request (returns false), or skip (true)
- if (appendEntryToWriteRequestOrSkip(
- request, handle, entry, driverOperation)) {
- skippedRules.add(rule);
- updateTranslationStore(
- driverOperation, handle, rule, entry);
- }
+ // Update mirror.
+ tableMirror.applyWriteRequest(request);
+ // Async submit request to server.
+ futureResponse = request.submit();
+ } finally {
+ WRITE_LOCKS.get(deviceId).unlock();
}
- // Submit request to server.
- final P4RuntimeWriteClient.WriteResponse response = request.submitSync();
- // Update mirror.
- tableMirror.replayWriteResponse(response);
+ // Wait for response.
+ final WriteResponse response = Futures.getUnchecked(futureResponse);
// Derive successfully applied flow rule from response.
final List<FlowRule> appliedRules = getAppliedFlowRulesAndUpdateTranslator(
response, handleToRuleMap, driverOperation);
@@ -297,7 +322,7 @@
}
private List<FlowRule> getAppliedFlowRulesAndUpdateTranslator(
- P4RuntimeWriteClient.WriteResponse response,
+ WriteResponse response,
Map<PiHandle, FlowRule> handleToFlowRuleMap,
Operation driverOperation) {
// Returns a list of flow rules that were successfully written on the
@@ -359,7 +384,7 @@
}
private boolean appendEntryToWriteRequestOrSkip(
- final P4RuntimeWriteClient.WriteRequest writeRequest,
+ final WriteRequest writeRequest,
final PiTableEntryHandle handle,
PiTableEntry piEntryToApply,
final Operation driverOperation) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index fb9451c..4daeaaa 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -234,9 +234,9 @@
@Override
@SuppressWarnings("unchecked")
- public void replayWriteResponse(P4RuntimeWriteClient.WriteResponse response) {
- response.success().stream()
- .filter(r -> r.entityType().equals(this.entityType) && r.isSuccess())
+ public void applyWriteRequest(P4RuntimeWriteClient.WriteRequest request) {
+ request.pendingUpdates().stream()
+ .filter(r -> r.entityType().equals(this.entityType))
.forEach(r -> {
switch (r.updateType()) {
case INSERT:
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
index bee8c51..b836455 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
@@ -94,16 +94,17 @@
/**
* Synchronizes the state of the given device ID with the given collection
* of PI entities.
+ *
* @param deviceId device ID
* @param entities collection of PI entities
*/
void sync(DeviceId deviceId, Collection<E> entities);
/**
- * Uses the given P4Runtime write response to update the state of this
- * mirror.
+ * Uses the given P4Runtime write request to update the state of this
+ * mirror by optimistically assuming that all updates in it will succeed.
*
- * @param response P4Runtime write response
+ * @param request P4Runtime write request
*/
- void replayWriteResponse(P4RuntimeWriteClient.WriteResponse response);
+ void applyWriteRequest(P4RuntimeWriteClient.WriteRequest request);
}