Fix inconsistent update type during concurrent P4Runtime writes
This is achieved by optimistically updating the P4Runtime mirror using
the write request (instead of waiting for a response) and by serializing
building write requests for the same device.
This change requires updating the P4Runtime protocol classes to expose
the content of the write request.
It also includes:
- force member weight to 1 when reading groups (some server
implementation still fails to be compliant to the spec)
- remove unused operation timeout handling in GDP (now all RPCz have a
timeout)
Change-Id: Ib4f99a6085c1283f46a2797e0c883d96954e02e9
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
index 9a47398..d4c3f61 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
@@ -43,8 +43,8 @@
ReadRequest read(PiPipeconf pipeconf);
/**
- * Abstraction of a P4Runtime read request that follows the builder pattern.
- * Multiple entities can be added to the same request before submitting it.
+ * Abstraction of a batched P4Runtime read request. Multiple entities can be
+ * added to the same request before submitting it.
*/
interface ReadRequest {
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
index 6d8dbfe..a6f3408 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
@@ -61,7 +61,7 @@
/**
* Signals if the entity was written successfully or not.
*/
- enum WriteResponseStatus {
+ enum EntityUpdateStatus {
/**
* The entity was written successfully, no errors occurred.
*/
@@ -86,8 +86,8 @@
*/
NOT_FOUND,
/**
- * Other error. See {@link WriteEntityResponse#explanation()} or {@link
- * WriteEntityResponse#throwable()} for more details.
+ * Other error. See {@link EntityUpdateResponse#explanation()} or {@link
+ * EntityUpdateResponse#throwable()} for more details.
*/
OTHER_ERROR,
}
@@ -113,11 +113,10 @@
}
/**
- * Abstraction of a P4Runtime write request that follows the builder
- * pattern. Multiple entities can be added to the same request before
- * submitting it. The implementation should guarantee that entities are
- * added in the final P4Runtime protobuf message in the same order as added
- * in this write request.
+ * Abstraction of a batched P4Runtime write request. Multiple entities can
+ * be added to the same request before submitting it. The implementation
+ * should guarantee that entities are added in the final P4Runtime protobuf
+ * message in the same order as added in this write request.
*/
interface WriteRequest {
@@ -217,12 +216,58 @@
* @return read response
*/
P4RuntimeWriteClient.WriteResponse submitSync();
+
+ /**
+ * Returns all entity update requests for which we are expecting a
+ * responce from the device, in the same order they were added to this
+ * batch.
+ *
+ *
+ *
+ * @return entity update requests
+ */
+ Collection<EntityUpdateRequest> pendingUpdates();
+ }
+
+ /**
+ * Represents the update request for a specific entity.
+ */
+ interface EntityUpdateRequest {
+ /**
+ * Returns the handle of the PI entity subject of this update.
+ *
+ * @return handle
+ */
+ PiHandle handle();
+
+ /**
+ * Returns the PI entity subject of this update. Returns {@code null} if
+ * the update type is {@link UpdateType#DELETE}, in which case only the
+ * handle is used in the request.
+ *
+ * @return PI entity or null
+ */
+ PiEntity entity();
+
+ /**
+ * Returns the type of update requested for this entity.
+ *
+ * @return update type
+ */
+ UpdateType updateType();
+
+ /**
+ * Returns the type of entity subject of this update.
+ *
+ * @return PI entity type
+ */
+ PiEntityType entityType();
}
/**
* Abstraction of a response obtained from a P4Runtime server after a write
* request is submitted. It allows returning a detailed response ({@link
- * WriteEntityResponse}) for each PI entity in the original request. Entity
+ * EntityUpdateResponse}) for each PI entity in the batched request. Entity
* responses are guaranteed to be returned in the same order as the
* corresponding PI entity in the request.
*/
@@ -243,73 +288,43 @@
* collection has size equal to the number of PI entities in the
* original write request.
*
- * @return collection of {@link WriteEntityResponse}
+ * @return collection of {@link EntityUpdateResponse}
*/
- Collection<WriteEntityResponse> all();
+ Collection<EntityUpdateResponse> all();
/**
* Returns a detailed response for each PI entity that was successfully
* written. If {@link #isSuccess()} is {@code true}, then this method is
* expected to return the same values as {@link #all()}.
*
- * @return collection of {@link WriteEntityResponse}
+ * @return collection of {@link EntityUpdateResponse}
*/
- Collection<WriteEntityResponse> success();
+ Collection<EntityUpdateResponse> success();
/**
* Returns a detailed response for each PI entity for which the server
* returned an error. If {@link #isSuccess()} is {@code true}, then this
* method is expected to return an empty collection.
*
- * @return collection of {@link WriteEntityResponse}
+ * @return collection of {@link EntityUpdateResponse}
*/
- Collection<WriteEntityResponse> failed();
+ Collection<EntityUpdateResponse> failed();
/**
* Returns a detailed response for each PI entity for which the server
* returned the given status.
*
* @param status status
- * @return collection of {@link WriteEntityResponse}
+ * @return collection of {@link EntityUpdateResponse}
*/
- Collection<WriteEntityResponse> status(WriteResponseStatus status);
+ Collection<EntityUpdateResponse> status(EntityUpdateStatus status);
}
/**
- * Represents the response of a write request for a specific PI entity.
+ * Represents the response to an update request request for a specific PI
+ * entity.
*/
- interface WriteEntityResponse {
-
- /**
- * Returns the handle associated with the PI entity.
- *
- * @return handle
- */
- PiHandle handle();
-
- /**
- * Returns the original PI entity as provided in the write request.
- * Returns {@code null} if the update type was {@link
- * UpdateType#DELETE}, in which case only the handle was used in the
- * request.
- *
- * @return PI entity or null
- */
- PiEntity entity();
-
- /**
- * Returns the type of write request performed for this entity.
- *
- * @return update type
- */
- UpdateType updateType();
-
- /**
- * Returns the type of this entity.
- *
- * @return PI entity type
- */
- PiEntityType entityType();
+ interface EntityUpdateResponse extends EntityUpdateRequest {
/**
* Returns true if this PI entity was written successfully, false
@@ -323,13 +338,13 @@
/**
* Returns the status for this PI entity. If {@link #isSuccess()}
* returns {@code true}, then this method is expected to return {@link
- * WriteResponseStatus#OK}. If {@link WriteResponseStatus#OTHER_ERROR}
+ * EntityUpdateStatus#OK}. If {@link EntityUpdateStatus#OTHER_ERROR}
* is returned, further details might be provided in {@link
* #explanation()} and {@link #throwable()}.
*
* @return status
*/
- WriteResponseStatus status();
+ EntityUpdateStatus status();
/**
* If the PI entity was NOT written successfully, this method returns a
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
index 5b5d087..fb3a5fb 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
@@ -27,9 +27,12 @@
import org.slf4j.Logger;
import p4.v1.P4RuntimeOuterClass;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.SHORT_TIMEOUT_SECONDS;
@@ -49,6 +52,7 @@
private final P4RuntimeClientImpl client;
private final PiPipeconf pipeconf;
+ private final AtomicBoolean submitted = new AtomicBoolean(false);
// The P4Runtime WriteRequest protobuf message we need to populate.
private final P4RuntimeOuterClass.WriteRequest.Builder requestMsg;
// WriteResponse instance builder. We populate entity responses as we add new
@@ -147,7 +151,14 @@
}
@Override
+ public Collection<P4RuntimeWriteClient.EntityUpdateRequest> pendingUpdates() {
+ return responseBuilder.pendingUpdates();
+ }
+
+ @Override
public CompletableFuture<P4RuntimeWriteClient.WriteResponse> submit() {
+ checkState(!submitted.getAndSet(true),
+ "Request has already been submitted, cannot submit again");
final P4RuntimeOuterClass.WriteRequest writeRequest = requestMsg
.setElectionId(client.lastUsedElectionId())
.build();
@@ -191,6 +202,8 @@
private void appendToRequestMsg(P4RuntimeWriteClient.UpdateType updateType,
PiEntity piEntity, PiHandle handle) {
+ checkState(!submitted.get(),
+ "Request has already been submitted, cannot add more entities");
final P4RuntimeOuterClass.Update.Type p4UpdateType;
final P4RuntimeOuterClass.Entity entityMsg;
try {
@@ -213,7 +226,7 @@
} catch (CodecException e) {
responseBuilder.addFailedResponse(
handle, piEntity, updateType, e.getMessage(),
- P4RuntimeWriteClient.WriteResponseStatus.CODEC_ERROR);
+ P4RuntimeWriteClient.EntityUpdateStatus.CODEC_ERROR);
}
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
index f24c717..35ee132 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
@@ -35,9 +35,10 @@
import org.onosproject.net.pi.runtime.PiEntityType;
import org.onosproject.net.pi.runtime.PiHandle;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateResponse;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateStatus;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteEntityResponse;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponseStatus;
import org.slf4j.Logger;
import p4.v1.P4RuntimeOuterClass;
@@ -46,7 +47,9 @@
import java.util.List;
import java.util.Map;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static org.slf4j.LoggerFactory.getLogger;
@@ -68,12 +71,12 @@
private static final Logger log = getLogger(WriteResponseImpl.class);
- private final ImmutableList<WriteEntityResponse> entityResponses;
- private final ImmutableListMultimap<WriteResponseStatus, WriteEntityResponse> statusMultimap;
+ private final ImmutableList<EntityUpdateResponse> entityResponses;
+ private final ImmutableListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMultimap;
private WriteResponseImpl(
- ImmutableList<WriteEntityResponse> allResponses,
- ImmutableListMultimap<WriteResponseStatus, WriteEntityResponse> statusMultimap) {
+ ImmutableList<EntityUpdateResponse> allResponses,
+ ImmutableListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMultimap) {
this.entityResponses = allResponses;
this.statusMultimap = statusMultimap;
}
@@ -84,25 +87,25 @@
}
@Override
- public Collection<WriteEntityResponse> all() {
+ public Collection<EntityUpdateResponse> all() {
return entityResponses;
}
@Override
- public Collection<WriteEntityResponse> success() {
- return statusMultimap.get(WriteResponseStatus.OK);
+ public Collection<EntityUpdateResponse> success() {
+ return statusMultimap.get(EntityUpdateStatus.OK);
}
@Override
- public Collection<WriteEntityResponse> failed() {
+ public Collection<EntityUpdateResponse> failed() {
return isSuccess()
? Collections.emptyList()
: entityResponses.stream().filter(r -> !r.isSuccess()).collect(toList());
}
@Override
- public Collection<WriteEntityResponse> status(
- WriteResponseStatus status) {
+ public Collection<EntityUpdateResponse> status(
+ EntityUpdateStatus status) {
checkNotNull(status);
return statusMultimap.get(status);
}
@@ -123,11 +126,11 @@
static final class Builder {
private final DeviceId deviceId;
- private final Map<Integer, WriteEntityResponseImpl> pendingResponses =
+ private final Map<Integer, EntityUpdateResponseImpl> pendingResponses =
Maps.newHashMap();
- private final List<WriteEntityResponse> allResponses =
+ private final List<EntityUpdateResponse> allResponses =
Lists.newArrayList();
- private final ListMultimap<WriteResponseStatus, WriteEntityResponse> statusMap =
+ private final ListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMap =
ArrayListMultimap.create();
private Builder(DeviceId deviceId) {
@@ -136,7 +139,7 @@
void addPendingResponse(PiHandle handle, PiEntity entity, UpdateType updateType) {
synchronized (this) {
- final WriteEntityResponseImpl resp = new WriteEntityResponseImpl(
+ final EntityUpdateResponseImpl resp = new EntityUpdateResponseImpl(
handle, entity, updateType);
allResponses.add(resp);
pendingResponses.put(pendingResponses.size(), resp);
@@ -144,21 +147,28 @@
}
void addFailedResponse(PiHandle handle, PiEntity entity, UpdateType updateType,
- String explanation, WriteResponseStatus status) {
+ String explanation, EntityUpdateStatus status) {
synchronized (this) {
- final WriteEntityResponseImpl resp = new WriteEntityResponseImpl(
+ final EntityUpdateResponseImpl resp = new EntityUpdateResponseImpl(
handle, entity, updateType)
.withFailure(explanation, status);
allResponses.add(resp);
}
}
+ Collection<EntityUpdateRequest> pendingUpdates() {
+ return ImmutableList.copyOf(pendingResponses.values());
+ }
+
WriteResponseImpl buildAsIs() {
synchronized (this) {
- if (!pendingResponses.isEmpty()) {
+ final long pendingCount = pendingResponses.values().stream()
+ .filter(r -> r.status() == EntityUpdateStatus.PENDING)
+ .count();
+ if (pendingCount > 0) {
log.warn("Partial response from {}, {} of {} total " +
- "entities are in status PENDING",
- deviceId, pendingResponses.size(), allResponses.size());
+ "updates are still in status PENDING",
+ deviceId, pendingCount, allResponses.size());
}
return new WriteResponseImpl(
ImmutableList.copyOf(allResponses),
@@ -169,7 +179,6 @@
WriteResponseImpl setSuccessAllAndBuild() {
synchronized (this) {
pendingResponses.values().forEach(this::doSetSuccess);
- pendingResponses.clear();
return buildAsIs();
}
}
@@ -177,7 +186,6 @@
WriteResponseImpl setFailAllAndBuild(Throwable throwable) {
synchronized (this) {
pendingResponses.values().forEach(r -> r.setFailure(throwable));
- pendingResponses.clear();
return buildAsIs();
}
}
@@ -190,8 +198,8 @@
private void setSuccess(int index) {
synchronized (this) {
- final WriteEntityResponseImpl resp = pendingResponses.remove(index);
- if (resp != null) {
+ final EntityUpdateResponseImpl resp = pendingResponses.get(index);
+ if (resp != null && resp.status == EntityUpdateStatus.PENDING) {
doSetSuccess(resp);
} else {
log.error("Missing pending response at index {}", index);
@@ -199,16 +207,16 @@
}
}
- private void doSetSuccess(WriteEntityResponseImpl resp) {
+ private void doSetSuccess(EntityUpdateResponseImpl resp) {
resp.setSuccess();
- statusMap.put(WriteResponseStatus.OK, resp);
+ statusMap.put(EntityUpdateStatus.OK, resp);
}
private void setFailure(int index,
String explanation,
- WriteResponseStatus status) {
+ EntityUpdateStatus status) {
synchronized (this) {
- final WriteEntityResponseImpl resp = pendingResponses.remove(index);
+ final EntityUpdateResponseImpl resp = pendingResponses.get(index);
if (resp != null) {
resp.withFailure(explanation, status);
statusMap.put(status, resp);
@@ -274,7 +282,7 @@
"P4Runtime Error message format not recognized [%s]",
TextFormat.shortDebugString(any));
if (reconcilable) {
- setFailure(index, unpackErr, WriteResponseStatus.OTHER_ERROR);
+ setFailure(index, unpackErr, EntityUpdateStatus.OTHER_ERROR);
} else {
log.warn(unpackErr);
}
@@ -283,7 +291,7 @@
// Map gRPC status codes to our WriteResponseStatus codes.
final Status.Code p4Code = Status.fromCodeValue(
p4Error.getCanonicalCode()).getCode();
- final WriteResponseStatus ourCode;
+ final EntityUpdateStatus ourCode;
switch (p4Code) {
case OK:
if (reconcilable) {
@@ -291,17 +299,17 @@
}
return;
case NOT_FOUND:
- ourCode = WriteResponseStatus.NOT_FOUND;
+ ourCode = EntityUpdateStatus.NOT_FOUND;
break;
case ALREADY_EXISTS:
- ourCode = WriteResponseStatus.ALREADY_EXIST;
+ ourCode = EntityUpdateStatus.ALREADY_EXIST;
break;
default:
- ourCode = WriteResponseStatus.OTHER_ERROR;
+ ourCode = EntityUpdateStatus.OTHER_ERROR;
break;
}
// Put the p4Code in the explanation only if ourCode is OTHER_ERROR.
- final String explanationCode = ourCode == WriteResponseStatus.OTHER_ERROR
+ final String explanationCode = ourCode == EntityUpdateStatus.OTHER_ERROR
? p4Code.name() + " " : "";
final String details = p4Error.hasDetails()
? ", " + p4Error.getDetails().toString() : "";
@@ -317,42 +325,50 @@
}
/**
- * Internal implementation of WriteEntityResponse.
+ * Internal implementation of EntityUpdateResponse.
*/
- private static final class WriteEntityResponseImpl implements WriteEntityResponse {
+ private static final class EntityUpdateResponseImpl implements EntityUpdateResponse {
private final PiHandle handle;
private final PiEntity entity;
private final UpdateType updateType;
- private WriteResponseStatus status = WriteResponseStatus.PENDING;
+ private EntityUpdateStatus status = EntityUpdateStatus.PENDING;
private String explanation;
private Throwable throwable;
- private WriteEntityResponseImpl(PiHandle handle, PiEntity entity, UpdateType updateType) {
+ private EntityUpdateResponseImpl(PiHandle handle, PiEntity entity, UpdateType updateType) {
this.handle = handle;
this.entity = entity;
this.updateType = updateType;
}
- private WriteEntityResponseImpl withFailure(
- String explanation, WriteResponseStatus status) {
- this.status = status;
+ private EntityUpdateResponseImpl withFailure(
+ String explanation, EntityUpdateStatus status) {
+ setStatus(status);
this.explanation = explanation;
this.throwable = null;
return this;
}
private void setSuccess() {
- this.status = WriteResponseStatus.OK;
+ setStatus(EntityUpdateStatus.OK);
}
private void setFailure(Throwable throwable) {
- this.status = WriteResponseStatus.OTHER_ERROR;
+ setStatus(EntityUpdateStatus.OTHER_ERROR);
this.explanation = throwable.toString();
this.throwable = throwable;
}
+ private void setStatus(EntityUpdateStatus newStatus) {
+ checkState(this.status == EntityUpdateStatus.PENDING,
+ "Cannot set status for non-pending update");
+ checkArgument(newStatus != EntityUpdateStatus.PENDING,
+ "newStatus must be different than pending");
+ this.status = newStatus;
+ }
+
@Override
public PiHandle handle() {
return handle;
@@ -375,11 +391,11 @@
@Override
public boolean isSuccess() {
- return status().equals(WriteResponseStatus.OK);
+ return status().equals(EntityUpdateStatus.OK);
}
@Override
- public WriteResponseStatus status() {
+ public EntityUpdateStatus status() {
return status;
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
index 7946394..5110274 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
@@ -91,8 +91,18 @@
.getPreamble().getName()))
.withId(PiActionProfileGroupId.of(msg.getGroupId()))
.withMaxSize(msg.getMaxSize());
- msg.getMembersList().forEach(m -> piGroupBuilder.addMember(
- PiActionProfileMemberId.of(m.getMemberId()), m.getWeight()));
+ msg.getMembersList().forEach(m -> {
+ final int weight;
+ if (m.getWeight() < 1) {
+ log.warn("Decoding group with invalid weight '{}', will set to 1",
+ m.getWeight());
+ weight = 1;
+ } else {
+ weight = m.getWeight();
+ }
+ piGroupBuilder.addMember(
+ PiActionProfileMemberId.of(m.getMemberId()), weight);
+ });
return piGroupBuilder.build();
}
}