Fix inconsistent update type during concurrent P4Runtime writes
This is achieved by optimistically updating the P4Runtime mirror using
the write request (instead of waiting for a response) and by serializing
building write requests for the same device.
This change requires updating the P4Runtime protocol classes to expose
the content of the write request.
It also includes:
- force member weight to 1 when reading groups (some server
implementation still fails to be compliant to the spec)
- remove unused operation timeout handling in GDP (now all RPCz have a
timeout)
Change-Id: Ib4f99a6085c1283f46a2797e0c883d96954e02e9
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
index fca05cb..01af5eb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
@@ -18,7 +18,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.onlab.util.SharedExecutors;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Striped;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileGroupMirror;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileMemberMirror;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMirror;
@@ -43,7 +44,8 @@
import org.onosproject.net.pi.service.PiTranslatedEntity;
import org.onosproject.net.pi.service.PiTranslationException;
import org.onosproject.p4runtime.api.P4RuntimeReadClient;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
import java.util.Collection;
import java.util.Collections;
@@ -52,6 +54,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toMap;
@@ -70,6 +74,10 @@
private static final String READ_ACTION_GROUPS_FROM_MIRROR = "actionGroupReadFromMirror";
private static final boolean DEFAULT_READ_ACTION_GROUPS_FROM_MIRROR = false;
+ // Used to make sure concurrent calls to write groups are serialized so
+ // that each request gets consistent access to mirror state.
+ private static final Striped<Lock> WRITE_LOCKS = Striped.lock(30);
+
protected GroupStore groupStore;
private P4RuntimeActionProfileGroupMirror groupMirror;
private P4RuntimeActionProfileMemberMirror memberMirror;
@@ -180,11 +188,23 @@
log.warn("Cleaning up {} action profile groups and " +
"{} members on {}...",
groupHandlesToRemove.size(), memberHandlesToRemove.size(), deviceId);
- SharedExecutors.getSingleThreadExecutor().execute(
- () -> submitWriteRequestAndUpdateMirror(
- client.write(pipeconf)
- .delete(groupHandlesToRemove)
- .delete(memberHandlesToRemove)));
+ final WriteRequest deleteRequest = client.write(pipeconf)
+ .delete(groupHandlesToRemove)
+ .delete(memberHandlesToRemove);
+ WRITE_LOCKS.get(deviceId).lock();
+ try {
+ updateMirrorAndSubmit(deleteRequest).whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Exception removing inconsistent group/members", ex);
+ } else {
+ log.debug("Completed removal of inconsistent " +
+ "groups/members ({} of {} updates succeeded)",
+ r.success(), r.all());
+ }
+ });
+ } finally {
+ WRITE_LOCKS.get(deviceId).unlock();
+ }
}
// Done.
@@ -294,30 +314,34 @@
if (members == null) {
return false;
}
- final P4RuntimeWriteClient.WriteRequest request = client.write(pipeconf);
- // FIXME: when operation is remove, should we remove members first? Same
- // thing when modifying a group, should we first modify the group then
- // remove the member?
- final boolean allMembersSkipped = members.stream()
- .allMatch(m -> appendEntityToWriteRequestOrSkip(
- request, m.handle(deviceId), m, memberMirror, operation));
- final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
- request, groupHandle, group, groupMirror, operation);
- if (allMembersSkipped && groupSkipped) {
- return true;
- } else {
- // True if all entities in the request (groups and members) where
- // written successfully.
- return submitWriteRequestAndUpdateMirror(request).isSuccess();
+ final WriteRequest request = client.write(pipeconf);
+ final CompletableFuture<WriteResponse> futureResponse;
+ WRITE_LOCKS.get(deviceId).lock();
+ try {
+ // FIXME: when operation is remove, should we remove members first? Same
+ // thing when modifying a group, should we first modify the group then
+ // remove the member?
+ final boolean allMembersSkipped = members.stream()
+ .allMatch(m -> appendEntityToWriteRequestOrSkip(
+ request, m.handle(deviceId), m, memberMirror, operation));
+ final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
+ request, groupHandle, group, groupMirror, operation);
+ if (allMembersSkipped && groupSkipped) {
+ return true;
+ } else {
+ futureResponse = updateMirrorAndSubmit(request);
+ }
+ } finally {
+ WRITE_LOCKS.get(deviceId).unlock();
}
+ // Wait response from device. Returns true if all entities in the
+ // request (groups and members) were written successfully.
+ return Futures.getUnchecked(futureResponse).isSuccess();
}
private <H extends PiHandle, E extends PiEntity> boolean appendEntityToWriteRequestOrSkip(
- P4RuntimeWriteClient.WriteRequest writeRequest,
- H handle,
- E entityToApply,
- P4RuntimeMirror<H, E> mirror,
- Operation operation) {
+ WriteRequest writeRequest, H handle, E entityToApply,
+ P4RuntimeMirror<H, E> mirror, Operation operation) {
// Should return true if there's no need to write entity on device,
// false if the write request is modified or an error occurs.
final TimedEntry<E> entityOnDevice = mirror.get(handle);
@@ -362,12 +386,10 @@
return instances;
}
- private P4RuntimeWriteClient.WriteResponse submitWriteRequestAndUpdateMirror(
- P4RuntimeWriteClient.WriteRequest request) {
- final P4RuntimeWriteClient.WriteResponse response = request.submitSync();
- groupMirror.replayWriteResponse(response);
- memberMirror.replayWriteResponse(response);
- return response;
+ private CompletableFuture<WriteResponse> updateMirrorAndSubmit(WriteRequest request) {
+ groupMirror.applyWriteRequest(request);
+ memberMirror.applyWriteRequest(request);
+ return request.submit();
}
enum Operation {