Fix inconsistent update type during concurrent P4Runtime writes
This is achieved by optimistically updating the P4Runtime mirror using
the write request (instead of waiting for a response) and by serializing
building write requests for the same device.
This change requires updating the P4Runtime protocol classes to expose
the content of the write request.
It also includes:
- force member weight to 1 when reading groups (some server
implementation still fails to be compliant to the spec)
- remove unused operation timeout handling in GDP (now all RPCz have a
timeout)
Change-Id: Ib4f99a6085c1283f46a2797e0c883d96954e02e9
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
index 5b5d087..fb3a5fb 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
@@ -27,9 +27,12 @@
import org.slf4j.Logger;
import p4.v1.P4RuntimeOuterClass;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.SHORT_TIMEOUT_SECONDS;
@@ -49,6 +52,7 @@
private final P4RuntimeClientImpl client;
private final PiPipeconf pipeconf;
+ private final AtomicBoolean submitted = new AtomicBoolean(false);
// The P4Runtime WriteRequest protobuf message we need to populate.
private final P4RuntimeOuterClass.WriteRequest.Builder requestMsg;
// WriteResponse instance builder. We populate entity responses as we add new
@@ -147,7 +151,14 @@
}
@Override
+ public Collection<P4RuntimeWriteClient.EntityUpdateRequest> pendingUpdates() {
+ return responseBuilder.pendingUpdates();
+ }
+
+ @Override
public CompletableFuture<P4RuntimeWriteClient.WriteResponse> submit() {
+ checkState(!submitted.getAndSet(true),
+ "Request has already been submitted, cannot submit again");
final P4RuntimeOuterClass.WriteRequest writeRequest = requestMsg
.setElectionId(client.lastUsedElectionId())
.build();
@@ -191,6 +202,8 @@
private void appendToRequestMsg(P4RuntimeWriteClient.UpdateType updateType,
PiEntity piEntity, PiHandle handle) {
+ checkState(!submitted.get(),
+ "Request has already been submitted, cannot add more entities");
final P4RuntimeOuterClass.Update.Type p4UpdateType;
final P4RuntimeOuterClass.Entity entityMsg;
try {
@@ -213,7 +226,7 @@
} catch (CodecException e) {
responseBuilder.addFailedResponse(
handle, piEntity, updateType, e.getMessage(),
- P4RuntimeWriteClient.WriteResponseStatus.CODEC_ERROR);
+ P4RuntimeWriteClient.EntityUpdateStatus.CODEC_ERROR);
}
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
index f24c717..35ee132 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
@@ -35,9 +35,10 @@
import org.onosproject.net.pi.runtime.PiEntityType;
import org.onosproject.net.pi.runtime.PiHandle;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateResponse;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateStatus;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteEntityResponse;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponseStatus;
import org.slf4j.Logger;
import p4.v1.P4RuntimeOuterClass;
@@ -46,7 +47,9 @@
import java.util.List;
import java.util.Map;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static org.slf4j.LoggerFactory.getLogger;
@@ -68,12 +71,12 @@
private static final Logger log = getLogger(WriteResponseImpl.class);
- private final ImmutableList<WriteEntityResponse> entityResponses;
- private final ImmutableListMultimap<WriteResponseStatus, WriteEntityResponse> statusMultimap;
+ private final ImmutableList<EntityUpdateResponse> entityResponses;
+ private final ImmutableListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMultimap;
private WriteResponseImpl(
- ImmutableList<WriteEntityResponse> allResponses,
- ImmutableListMultimap<WriteResponseStatus, WriteEntityResponse> statusMultimap) {
+ ImmutableList<EntityUpdateResponse> allResponses,
+ ImmutableListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMultimap) {
this.entityResponses = allResponses;
this.statusMultimap = statusMultimap;
}
@@ -84,25 +87,25 @@
}
@Override
- public Collection<WriteEntityResponse> all() {
+ public Collection<EntityUpdateResponse> all() {
return entityResponses;
}
@Override
- public Collection<WriteEntityResponse> success() {
- return statusMultimap.get(WriteResponseStatus.OK);
+ public Collection<EntityUpdateResponse> success() {
+ return statusMultimap.get(EntityUpdateStatus.OK);
}
@Override
- public Collection<WriteEntityResponse> failed() {
+ public Collection<EntityUpdateResponse> failed() {
return isSuccess()
? Collections.emptyList()
: entityResponses.stream().filter(r -> !r.isSuccess()).collect(toList());
}
@Override
- public Collection<WriteEntityResponse> status(
- WriteResponseStatus status) {
+ public Collection<EntityUpdateResponse> status(
+ EntityUpdateStatus status) {
checkNotNull(status);
return statusMultimap.get(status);
}
@@ -123,11 +126,11 @@
static final class Builder {
private final DeviceId deviceId;
- private final Map<Integer, WriteEntityResponseImpl> pendingResponses =
+ private final Map<Integer, EntityUpdateResponseImpl> pendingResponses =
Maps.newHashMap();
- private final List<WriteEntityResponse> allResponses =
+ private final List<EntityUpdateResponse> allResponses =
Lists.newArrayList();
- private final ListMultimap<WriteResponseStatus, WriteEntityResponse> statusMap =
+ private final ListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMap =
ArrayListMultimap.create();
private Builder(DeviceId deviceId) {
@@ -136,7 +139,7 @@
void addPendingResponse(PiHandle handle, PiEntity entity, UpdateType updateType) {
synchronized (this) {
- final WriteEntityResponseImpl resp = new WriteEntityResponseImpl(
+ final EntityUpdateResponseImpl resp = new EntityUpdateResponseImpl(
handle, entity, updateType);
allResponses.add(resp);
pendingResponses.put(pendingResponses.size(), resp);
@@ -144,21 +147,28 @@
}
void addFailedResponse(PiHandle handle, PiEntity entity, UpdateType updateType,
- String explanation, WriteResponseStatus status) {
+ String explanation, EntityUpdateStatus status) {
synchronized (this) {
- final WriteEntityResponseImpl resp = new WriteEntityResponseImpl(
+ final EntityUpdateResponseImpl resp = new EntityUpdateResponseImpl(
handle, entity, updateType)
.withFailure(explanation, status);
allResponses.add(resp);
}
}
+ Collection<EntityUpdateRequest> pendingUpdates() {
+ return ImmutableList.copyOf(pendingResponses.values());
+ }
+
WriteResponseImpl buildAsIs() {
synchronized (this) {
- if (!pendingResponses.isEmpty()) {
+ final long pendingCount = pendingResponses.values().stream()
+ .filter(r -> r.status() == EntityUpdateStatus.PENDING)
+ .count();
+ if (pendingCount > 0) {
log.warn("Partial response from {}, {} of {} total " +
- "entities are in status PENDING",
- deviceId, pendingResponses.size(), allResponses.size());
+ "updates are still in status PENDING",
+ deviceId, pendingCount, allResponses.size());
}
return new WriteResponseImpl(
ImmutableList.copyOf(allResponses),
@@ -169,7 +179,6 @@
WriteResponseImpl setSuccessAllAndBuild() {
synchronized (this) {
pendingResponses.values().forEach(this::doSetSuccess);
- pendingResponses.clear();
return buildAsIs();
}
}
@@ -177,7 +186,6 @@
WriteResponseImpl setFailAllAndBuild(Throwable throwable) {
synchronized (this) {
pendingResponses.values().forEach(r -> r.setFailure(throwable));
- pendingResponses.clear();
return buildAsIs();
}
}
@@ -190,8 +198,8 @@
private void setSuccess(int index) {
synchronized (this) {
- final WriteEntityResponseImpl resp = pendingResponses.remove(index);
- if (resp != null) {
+ final EntityUpdateResponseImpl resp = pendingResponses.get(index);
+ if (resp != null && resp.status == EntityUpdateStatus.PENDING) {
doSetSuccess(resp);
} else {
log.error("Missing pending response at index {}", index);
@@ -199,16 +207,16 @@
}
}
- private void doSetSuccess(WriteEntityResponseImpl resp) {
+ private void doSetSuccess(EntityUpdateResponseImpl resp) {
resp.setSuccess();
- statusMap.put(WriteResponseStatus.OK, resp);
+ statusMap.put(EntityUpdateStatus.OK, resp);
}
private void setFailure(int index,
String explanation,
- WriteResponseStatus status) {
+ EntityUpdateStatus status) {
synchronized (this) {
- final WriteEntityResponseImpl resp = pendingResponses.remove(index);
+ final EntityUpdateResponseImpl resp = pendingResponses.get(index);
if (resp != null) {
resp.withFailure(explanation, status);
statusMap.put(status, resp);
@@ -274,7 +282,7 @@
"P4Runtime Error message format not recognized [%s]",
TextFormat.shortDebugString(any));
if (reconcilable) {
- setFailure(index, unpackErr, WriteResponseStatus.OTHER_ERROR);
+ setFailure(index, unpackErr, EntityUpdateStatus.OTHER_ERROR);
} else {
log.warn(unpackErr);
}
@@ -283,7 +291,7 @@
// Map gRPC status codes to our WriteResponseStatus codes.
final Status.Code p4Code = Status.fromCodeValue(
p4Error.getCanonicalCode()).getCode();
- final WriteResponseStatus ourCode;
+ final EntityUpdateStatus ourCode;
switch (p4Code) {
case OK:
if (reconcilable) {
@@ -291,17 +299,17 @@
}
return;
case NOT_FOUND:
- ourCode = WriteResponseStatus.NOT_FOUND;
+ ourCode = EntityUpdateStatus.NOT_FOUND;
break;
case ALREADY_EXISTS:
- ourCode = WriteResponseStatus.ALREADY_EXIST;
+ ourCode = EntityUpdateStatus.ALREADY_EXIST;
break;
default:
- ourCode = WriteResponseStatus.OTHER_ERROR;
+ ourCode = EntityUpdateStatus.OTHER_ERROR;
break;
}
// Put the p4Code in the explanation only if ourCode is OTHER_ERROR.
- final String explanationCode = ourCode == WriteResponseStatus.OTHER_ERROR
+ final String explanationCode = ourCode == EntityUpdateStatus.OTHER_ERROR
? p4Code.name() + " " : "";
final String details = p4Error.hasDetails()
? ", " + p4Error.getDetails().toString() : "";
@@ -317,42 +325,50 @@
}
/**
- * Internal implementation of WriteEntityResponse.
+ * Internal implementation of EntityUpdateResponse.
*/
- private static final class WriteEntityResponseImpl implements WriteEntityResponse {
+ private static final class EntityUpdateResponseImpl implements EntityUpdateResponse {
private final PiHandle handle;
private final PiEntity entity;
private final UpdateType updateType;
- private WriteResponseStatus status = WriteResponseStatus.PENDING;
+ private EntityUpdateStatus status = EntityUpdateStatus.PENDING;
private String explanation;
private Throwable throwable;
- private WriteEntityResponseImpl(PiHandle handle, PiEntity entity, UpdateType updateType) {
+ private EntityUpdateResponseImpl(PiHandle handle, PiEntity entity, UpdateType updateType) {
this.handle = handle;
this.entity = entity;
this.updateType = updateType;
}
- private WriteEntityResponseImpl withFailure(
- String explanation, WriteResponseStatus status) {
- this.status = status;
+ private EntityUpdateResponseImpl withFailure(
+ String explanation, EntityUpdateStatus status) {
+ setStatus(status);
this.explanation = explanation;
this.throwable = null;
return this;
}
private void setSuccess() {
- this.status = WriteResponseStatus.OK;
+ setStatus(EntityUpdateStatus.OK);
}
private void setFailure(Throwable throwable) {
- this.status = WriteResponseStatus.OTHER_ERROR;
+ setStatus(EntityUpdateStatus.OTHER_ERROR);
this.explanation = throwable.toString();
this.throwable = throwable;
}
+ private void setStatus(EntityUpdateStatus newStatus) {
+ checkState(this.status == EntityUpdateStatus.PENDING,
+ "Cannot set status for non-pending update");
+ checkArgument(newStatus != EntityUpdateStatus.PENDING,
+ "newStatus must be different than pending");
+ this.status = newStatus;
+ }
+
@Override
public PiHandle handle() {
return handle;
@@ -375,11 +391,11 @@
@Override
public boolean isSuccess() {
- return status().equals(WriteResponseStatus.OK);
+ return status().equals(EntityUpdateStatus.OK);
}
@Override
- public WriteResponseStatus status() {
+ public EntityUpdateStatus status() {
return status;
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
index 7946394..5110274 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
@@ -91,8 +91,18 @@
.getPreamble().getName()))
.withId(PiActionProfileGroupId.of(msg.getGroupId()))
.withMaxSize(msg.getMaxSize());
- msg.getMembersList().forEach(m -> piGroupBuilder.addMember(
- PiActionProfileMemberId.of(m.getMemberId()), m.getWeight()));
+ msg.getMembersList().forEach(m -> {
+ final int weight;
+ if (m.getWeight() < 1) {
+ log.warn("Decoding group with invalid weight '{}', will set to 1",
+ m.getWeight());
+ weight = 1;
+ } else {
+ weight = m.getWeight();
+ }
+ piGroupBuilder.addMember(
+ PiActionProfileMemberId.of(m.getMemberId()), weight);
+ });
return piGroupBuilder.build();
}
}