Fix inconsistent update type during concurrent P4Runtime writes
This is achieved by optimistically updating the P4Runtime mirror using
the write request (instead of waiting for a response) and by serializing
building write requests for the same device.
This change requires updating the P4Runtime protocol classes to expose
the content of the write request.
It also includes:
- force member weight to 1 when reading groups (some server
implementation still fails to be compliant to the spec)
- remove unused operation timeout handling in GDP (now all RPCz have a
timeout)
Change-Id: Ib4f99a6085c1283f46a2797e0c883d96954e02e9
diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiActionProfileGroup.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiActionProfileGroup.java
index 57e65a2..3fd7e08 100644
--- a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiActionProfileGroup.java
+++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiActionProfileGroup.java
@@ -125,7 +125,7 @@
return MoreObjects.toStringHelper(this)
.add("actionProfile", actionProfileId)
.add("id", groupId)
- .add("members", members)
+ .add("members", members.values())
.add("maxSize", maxSize)
.toString();
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
index fca05cb..01af5eb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
@@ -18,7 +18,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.onlab.util.SharedExecutors;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Striped;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileGroupMirror;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileMemberMirror;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMirror;
@@ -43,7 +44,8 @@
import org.onosproject.net.pi.service.PiTranslatedEntity;
import org.onosproject.net.pi.service.PiTranslationException;
import org.onosproject.p4runtime.api.P4RuntimeReadClient;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
import java.util.Collection;
import java.util.Collections;
@@ -52,6 +54,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toMap;
@@ -70,6 +74,10 @@
private static final String READ_ACTION_GROUPS_FROM_MIRROR = "actionGroupReadFromMirror";
private static final boolean DEFAULT_READ_ACTION_GROUPS_FROM_MIRROR = false;
+ // Used to make sure concurrent calls to write groups are serialized so
+ // that each request gets consistent access to mirror state.
+ private static final Striped<Lock> WRITE_LOCKS = Striped.lock(30);
+
protected GroupStore groupStore;
private P4RuntimeActionProfileGroupMirror groupMirror;
private P4RuntimeActionProfileMemberMirror memberMirror;
@@ -180,11 +188,23 @@
log.warn("Cleaning up {} action profile groups and " +
"{} members on {}...",
groupHandlesToRemove.size(), memberHandlesToRemove.size(), deviceId);
- SharedExecutors.getSingleThreadExecutor().execute(
- () -> submitWriteRequestAndUpdateMirror(
- client.write(pipeconf)
- .delete(groupHandlesToRemove)
- .delete(memberHandlesToRemove)));
+ final WriteRequest deleteRequest = client.write(pipeconf)
+ .delete(groupHandlesToRemove)
+ .delete(memberHandlesToRemove);
+ WRITE_LOCKS.get(deviceId).lock();
+ try {
+ updateMirrorAndSubmit(deleteRequest).whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Exception removing inconsistent group/members", ex);
+ } else {
+ log.debug("Completed removal of inconsistent " +
+ "groups/members ({} of {} updates succeeded)",
+ r.success(), r.all());
+ }
+ });
+ } finally {
+ WRITE_LOCKS.get(deviceId).unlock();
+ }
}
// Done.
@@ -294,30 +314,34 @@
if (members == null) {
return false;
}
- final P4RuntimeWriteClient.WriteRequest request = client.write(pipeconf);
- // FIXME: when operation is remove, should we remove members first? Same
- // thing when modifying a group, should we first modify the group then
- // remove the member?
- final boolean allMembersSkipped = members.stream()
- .allMatch(m -> appendEntityToWriteRequestOrSkip(
- request, m.handle(deviceId), m, memberMirror, operation));
- final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
- request, groupHandle, group, groupMirror, operation);
- if (allMembersSkipped && groupSkipped) {
- return true;
- } else {
- // True if all entities in the request (groups and members) where
- // written successfully.
- return submitWriteRequestAndUpdateMirror(request).isSuccess();
+ final WriteRequest request = client.write(pipeconf);
+ final CompletableFuture<WriteResponse> futureResponse;
+ WRITE_LOCKS.get(deviceId).lock();
+ try {
+ // FIXME: when operation is remove, should we remove members first? Same
+ // thing when modifying a group, should we first modify the group then
+ // remove the member?
+ final boolean allMembersSkipped = members.stream()
+ .allMatch(m -> appendEntityToWriteRequestOrSkip(
+ request, m.handle(deviceId), m, memberMirror, operation));
+ final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
+ request, groupHandle, group, groupMirror, operation);
+ if (allMembersSkipped && groupSkipped) {
+ return true;
+ } else {
+ futureResponse = updateMirrorAndSubmit(request);
+ }
+ } finally {
+ WRITE_LOCKS.get(deviceId).unlock();
}
+ // Wait response from device. Returns true if all entities in the
+ // request (groups and members) were written successfully.
+ return Futures.getUnchecked(futureResponse).isSuccess();
}
private <H extends PiHandle, E extends PiEntity> boolean appendEntityToWriteRequestOrSkip(
- P4RuntimeWriteClient.WriteRequest writeRequest,
- H handle,
- E entityToApply,
- P4RuntimeMirror<H, E> mirror,
- Operation operation) {
+ WriteRequest writeRequest, H handle, E entityToApply,
+ P4RuntimeMirror<H, E> mirror, Operation operation) {
// Should return true if there's no need to write entity on device,
// false if the write request is modified or an error occurs.
final TimedEntry<E> entityOnDevice = mirror.get(handle);
@@ -362,12 +386,10 @@
return instances;
}
- private P4RuntimeWriteClient.WriteResponse submitWriteRequestAndUpdateMirror(
- P4RuntimeWriteClient.WriteRequest request) {
- final P4RuntimeWriteClient.WriteResponse response = request.submitSync();
- groupMirror.replayWriteResponse(response);
- memberMirror.replayWriteResponse(response);
- return response;
+ private CompletableFuture<WriteResponse> updateMirrorAndSubmit(WriteRequest request) {
+ groupMirror.applyWriteRequest(request);
+ memberMirror.applyWriteRequest(request);
+ return request.submit();
}
enum Operation {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index f905c4c..8a1f99a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -19,7 +19,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.onlab.util.SharedExecutors;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Striped;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
import org.onosproject.net.flow.DefaultFlowEntry;
@@ -42,8 +43,9 @@
import org.onosproject.net.pi.service.PiTranslatedEntity;
import org.onosproject.net.pi.service.PiTranslationException;
import org.onosproject.p4runtime.api.P4RuntimeReadClient;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
import java.util.Collection;
import java.util.Collections;
@@ -52,6 +54,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import static org.onosproject.drivers.p4runtime.P4RuntimeFlowRuleProgrammable.Operation.APPLY;
@@ -97,6 +101,10 @@
private static final String TABLE_DEFAULT_AS_ENTRY = "tableDefaultAsEntry";
private static final boolean DEFAULT_TABLE_DEFAULT_AS_ENTRY = false;
+ // Used to make sure concurrent calls to write flow rules are serialized so
+ // that each request gets consistent access to mirror state.
+ private static final Striped<Lock> WRITE_LOCKS = Striped.lock(30);
+
private PiPipelineModel pipelineModel;
private P4RuntimeTableMirror tableMirror;
private PiFlowRuleTranslator translator;
@@ -165,8 +173,25 @@
if (!inconsistentEntries.isEmpty()) {
// Trigger clean up of inconsistent entries.
- SharedExecutors.getSingleThreadExecutor().execute(
- () -> cleanUpInconsistentEntries(inconsistentEntries));
+ log.warn("Found {} inconsistent table entries on {}, removing them...",
+ inconsistentEntries.size(), deviceId);
+ final WriteRequest request = client.write(pipeconf)
+ .entities(inconsistentEntries, DELETE);
+ WRITE_LOCKS.get(deviceId).lock();
+ // Update mirror and async submit delete request.
+ try {
+ tableMirror.applyWriteRequest(request);
+ request.submit().whenComplete((response, ex) -> {
+ if (ex != null) {
+ log.error("Exception removing inconsistent table entries", ex);
+ } else {
+ log.debug("Successfully removed {} out of {} inconsistent entries",
+ response.success().size(), response.all().size());
+ }
+ });
+ } finally {
+ WRITE_LOCKS.get(deviceId).unlock();
+ }
}
return result.build();
@@ -246,48 +271,48 @@
.collect(Collectors.toList());
}
- private void cleanUpInconsistentEntries(Collection<PiTableEntry> piEntries) {
- log.warn("Found {} inconsistent table entries on {}, removing them...",
- piEntries.size(), deviceId);
- // Remove entries and update mirror.
- tableMirror.replayWriteResponse(
- client.write(pipeconf).entities(piEntries, DELETE).submitSync());
- }
-
private Collection<FlowRule> processFlowRules(Collection<FlowRule> rules,
Operation driverOperation) {
if (!setupBehaviour() || rules.isEmpty()) {
return Collections.emptyList();
}
// Created batched write request.
- final P4RuntimeWriteClient.WriteRequest request = client.write(pipeconf);
+ final WriteRequest request = client.write(pipeconf);
// For each rule, translate to PI and append to write request.
final Map<PiHandle, FlowRule> handleToRuleMap = Maps.newHashMap();
final List<FlowRule> skippedRules = Lists.newArrayList();
- for (FlowRule rule : rules) {
- final PiTableEntry entry;
- try {
- entry = translator.translate(rule, pipeconf);
- } catch (PiTranslationException e) {
- log.warn("Unable to translate flow rule for pipeconf '{}': {} [{}]",
- pipeconf.id(), e.getMessage(), rule);
- // Next rule.
- continue;
+ final CompletableFuture<WriteResponse> futureResponse;
+ WRITE_LOCKS.get(deviceId).lock();
+ try {
+ for (FlowRule rule : rules) {
+ final PiTableEntry entry;
+ try {
+ entry = translator.translate(rule, pipeconf);
+ } catch (PiTranslationException e) {
+ log.warn("Unable to translate flow rule for pipeconf '{}': {} [{}]",
+ pipeconf.id(), e.getMessage(), rule);
+ // Next rule.
+ continue;
+ }
+ final PiTableEntryHandle handle = entry.handle(deviceId);
+ handleToRuleMap.put(handle, rule);
+ // Append entry to batched write request (returns false), or skip (true)
+ if (appendEntryToWriteRequestOrSkip(
+ request, handle, entry, driverOperation)) {
+ skippedRules.add(rule);
+ updateTranslationStore(
+ driverOperation, handle, rule, entry);
+ }
}
- final PiTableEntryHandle handle = entry.handle(deviceId);
- handleToRuleMap.put(handle, rule);
- // Append entry to batched write request (returns false), or skip (true)
- if (appendEntryToWriteRequestOrSkip(
- request, handle, entry, driverOperation)) {
- skippedRules.add(rule);
- updateTranslationStore(
- driverOperation, handle, rule, entry);
- }
+ // Update mirror.
+ tableMirror.applyWriteRequest(request);
+ // Async submit request to server.
+ futureResponse = request.submit();
+ } finally {
+ WRITE_LOCKS.get(deviceId).unlock();
}
- // Submit request to server.
- final P4RuntimeWriteClient.WriteResponse response = request.submitSync();
- // Update mirror.
- tableMirror.replayWriteResponse(response);
+ // Wait for response.
+ final WriteResponse response = Futures.getUnchecked(futureResponse);
// Derive successfully applied flow rule from response.
final List<FlowRule> appliedRules = getAppliedFlowRulesAndUpdateTranslator(
response, handleToRuleMap, driverOperation);
@@ -297,7 +322,7 @@
}
private List<FlowRule> getAppliedFlowRulesAndUpdateTranslator(
- P4RuntimeWriteClient.WriteResponse response,
+ WriteResponse response,
Map<PiHandle, FlowRule> handleToFlowRuleMap,
Operation driverOperation) {
// Returns a list of flow rules that were successfully written on the
@@ -359,7 +384,7 @@
}
private boolean appendEntryToWriteRequestOrSkip(
- final P4RuntimeWriteClient.WriteRequest writeRequest,
+ final WriteRequest writeRequest,
final PiTableEntryHandle handle,
PiTableEntry piEntryToApply,
final Operation driverOperation) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index fb9451c..4daeaaa 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -234,9 +234,9 @@
@Override
@SuppressWarnings("unchecked")
- public void replayWriteResponse(P4RuntimeWriteClient.WriteResponse response) {
- response.success().stream()
- .filter(r -> r.entityType().equals(this.entityType) && r.isSuccess())
+ public void applyWriteRequest(P4RuntimeWriteClient.WriteRequest request) {
+ request.pendingUpdates().stream()
+ .filter(r -> r.entityType().equals(this.entityType))
.forEach(r -> {
switch (r.updateType()) {
case INSERT:
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
index bee8c51..b836455 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
@@ -94,16 +94,17 @@
/**
* Synchronizes the state of the given device ID with the given collection
* of PI entities.
+ *
* @param deviceId device ID
* @param entities collection of PI entities
*/
void sync(DeviceId deviceId, Collection<E> entities);
/**
- * Uses the given P4Runtime write response to update the state of this
- * mirror.
+ * Uses the given P4Runtime write request to update the state of this
+ * mirror by optimistically assuming that all updates in it will succeed.
*
- * @param response P4Runtime write response
+ * @param request P4Runtime write request
*/
- void replayWriteResponse(P4RuntimeWriteClient.WriteResponse response);
+ void applyWriteRequest(P4RuntimeWriteClient.WriteRequest request);
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
index 9a47398..d4c3f61 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
@@ -43,8 +43,8 @@
ReadRequest read(PiPipeconf pipeconf);
/**
- * Abstraction of a P4Runtime read request that follows the builder pattern.
- * Multiple entities can be added to the same request before submitting it.
+ * Abstraction of a batched P4Runtime read request. Multiple entities can be
+ * added to the same request before submitting it.
*/
interface ReadRequest {
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
index 6d8dbfe..a6f3408 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
@@ -61,7 +61,7 @@
/**
* Signals if the entity was written successfully or not.
*/
- enum WriteResponseStatus {
+ enum EntityUpdateStatus {
/**
* The entity was written successfully, no errors occurred.
*/
@@ -86,8 +86,8 @@
*/
NOT_FOUND,
/**
- * Other error. See {@link WriteEntityResponse#explanation()} or {@link
- * WriteEntityResponse#throwable()} for more details.
+ * Other error. See {@link EntityUpdateResponse#explanation()} or {@link
+ * EntityUpdateResponse#throwable()} for more details.
*/
OTHER_ERROR,
}
@@ -113,11 +113,10 @@
}
/**
- * Abstraction of a P4Runtime write request that follows the builder
- * pattern. Multiple entities can be added to the same request before
- * submitting it. The implementation should guarantee that entities are
- * added in the final P4Runtime protobuf message in the same order as added
- * in this write request.
+ * Abstraction of a batched P4Runtime write request. Multiple entities can
+ * be added to the same request before submitting it. The implementation
+ * should guarantee that entities are added in the final P4Runtime protobuf
+ * message in the same order as added in this write request.
*/
interface WriteRequest {
@@ -217,12 +216,58 @@
* @return read response
*/
P4RuntimeWriteClient.WriteResponse submitSync();
+
+ /**
+ * Returns all entity update requests for which we are expecting a
+ * responce from the device, in the same order they were added to this
+ * batch.
+ *
+ *
+ *
+ * @return entity update requests
+ */
+ Collection<EntityUpdateRequest> pendingUpdates();
+ }
+
+ /**
+ * Represents the update request for a specific entity.
+ */
+ interface EntityUpdateRequest {
+ /**
+ * Returns the handle of the PI entity subject of this update.
+ *
+ * @return handle
+ */
+ PiHandle handle();
+
+ /**
+ * Returns the PI entity subject of this update. Returns {@code null} if
+ * the update type is {@link UpdateType#DELETE}, in which case only the
+ * handle is used in the request.
+ *
+ * @return PI entity or null
+ */
+ PiEntity entity();
+
+ /**
+ * Returns the type of update requested for this entity.
+ *
+ * @return update type
+ */
+ UpdateType updateType();
+
+ /**
+ * Returns the type of entity subject of this update.
+ *
+ * @return PI entity type
+ */
+ PiEntityType entityType();
}
/**
* Abstraction of a response obtained from a P4Runtime server after a write
* request is submitted. It allows returning a detailed response ({@link
- * WriteEntityResponse}) for each PI entity in the original request. Entity
+ * EntityUpdateResponse}) for each PI entity in the batched request. Entity
* responses are guaranteed to be returned in the same order as the
* corresponding PI entity in the request.
*/
@@ -243,73 +288,43 @@
* collection has size equal to the number of PI entities in the
* original write request.
*
- * @return collection of {@link WriteEntityResponse}
+ * @return collection of {@link EntityUpdateResponse}
*/
- Collection<WriteEntityResponse> all();
+ Collection<EntityUpdateResponse> all();
/**
* Returns a detailed response for each PI entity that was successfully
* written. If {@link #isSuccess()} is {@code true}, then this method is
* expected to return the same values as {@link #all()}.
*
- * @return collection of {@link WriteEntityResponse}
+ * @return collection of {@link EntityUpdateResponse}
*/
- Collection<WriteEntityResponse> success();
+ Collection<EntityUpdateResponse> success();
/**
* Returns a detailed response for each PI entity for which the server
* returned an error. If {@link #isSuccess()} is {@code true}, then this
* method is expected to return an empty collection.
*
- * @return collection of {@link WriteEntityResponse}
+ * @return collection of {@link EntityUpdateResponse}
*/
- Collection<WriteEntityResponse> failed();
+ Collection<EntityUpdateResponse> failed();
/**
* Returns a detailed response for each PI entity for which the server
* returned the given status.
*
* @param status status
- * @return collection of {@link WriteEntityResponse}
+ * @return collection of {@link EntityUpdateResponse}
*/
- Collection<WriteEntityResponse> status(WriteResponseStatus status);
+ Collection<EntityUpdateResponse> status(EntityUpdateStatus status);
}
/**
- * Represents the response of a write request for a specific PI entity.
+ * Represents the response to an update request request for a specific PI
+ * entity.
*/
- interface WriteEntityResponse {
-
- /**
- * Returns the handle associated with the PI entity.
- *
- * @return handle
- */
- PiHandle handle();
-
- /**
- * Returns the original PI entity as provided in the write request.
- * Returns {@code null} if the update type was {@link
- * UpdateType#DELETE}, in which case only the handle was used in the
- * request.
- *
- * @return PI entity or null
- */
- PiEntity entity();
-
- /**
- * Returns the type of write request performed for this entity.
- *
- * @return update type
- */
- UpdateType updateType();
-
- /**
- * Returns the type of this entity.
- *
- * @return PI entity type
- */
- PiEntityType entityType();
+ interface EntityUpdateResponse extends EntityUpdateRequest {
/**
* Returns true if this PI entity was written successfully, false
@@ -323,13 +338,13 @@
/**
* Returns the status for this PI entity. If {@link #isSuccess()}
* returns {@code true}, then this method is expected to return {@link
- * WriteResponseStatus#OK}. If {@link WriteResponseStatus#OTHER_ERROR}
+ * EntityUpdateStatus#OK}. If {@link EntityUpdateStatus#OTHER_ERROR}
* is returned, further details might be provided in {@link
* #explanation()} and {@link #throwable()}.
*
* @return status
*/
- WriteResponseStatus status();
+ EntityUpdateStatus status();
/**
* If the PI entity was NOT written successfully, this method returns a
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
index 5b5d087..fb3a5fb 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
@@ -27,9 +27,12 @@
import org.slf4j.Logger;
import p4.v1.P4RuntimeOuterClass;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.SHORT_TIMEOUT_SECONDS;
@@ -49,6 +52,7 @@
private final P4RuntimeClientImpl client;
private final PiPipeconf pipeconf;
+ private final AtomicBoolean submitted = new AtomicBoolean(false);
// The P4Runtime WriteRequest protobuf message we need to populate.
private final P4RuntimeOuterClass.WriteRequest.Builder requestMsg;
// WriteResponse instance builder. We populate entity responses as we add new
@@ -147,7 +151,14 @@
}
@Override
+ public Collection<P4RuntimeWriteClient.EntityUpdateRequest> pendingUpdates() {
+ return responseBuilder.pendingUpdates();
+ }
+
+ @Override
public CompletableFuture<P4RuntimeWriteClient.WriteResponse> submit() {
+ checkState(!submitted.getAndSet(true),
+ "Request has already been submitted, cannot submit again");
final P4RuntimeOuterClass.WriteRequest writeRequest = requestMsg
.setElectionId(client.lastUsedElectionId())
.build();
@@ -191,6 +202,8 @@
private void appendToRequestMsg(P4RuntimeWriteClient.UpdateType updateType,
PiEntity piEntity, PiHandle handle) {
+ checkState(!submitted.get(),
+ "Request has already been submitted, cannot add more entities");
final P4RuntimeOuterClass.Update.Type p4UpdateType;
final P4RuntimeOuterClass.Entity entityMsg;
try {
@@ -213,7 +226,7 @@
} catch (CodecException e) {
responseBuilder.addFailedResponse(
handle, piEntity, updateType, e.getMessage(),
- P4RuntimeWriteClient.WriteResponseStatus.CODEC_ERROR);
+ P4RuntimeWriteClient.EntityUpdateStatus.CODEC_ERROR);
}
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
index f24c717..35ee132 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
@@ -35,9 +35,10 @@
import org.onosproject.net.pi.runtime.PiEntityType;
import org.onosproject.net.pi.runtime.PiHandle;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateResponse;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateStatus;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteEntityResponse;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponseStatus;
import org.slf4j.Logger;
import p4.v1.P4RuntimeOuterClass;
@@ -46,7 +47,9 @@
import java.util.List;
import java.util.Map;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static org.slf4j.LoggerFactory.getLogger;
@@ -68,12 +71,12 @@
private static final Logger log = getLogger(WriteResponseImpl.class);
- private final ImmutableList<WriteEntityResponse> entityResponses;
- private final ImmutableListMultimap<WriteResponseStatus, WriteEntityResponse> statusMultimap;
+ private final ImmutableList<EntityUpdateResponse> entityResponses;
+ private final ImmutableListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMultimap;
private WriteResponseImpl(
- ImmutableList<WriteEntityResponse> allResponses,
- ImmutableListMultimap<WriteResponseStatus, WriteEntityResponse> statusMultimap) {
+ ImmutableList<EntityUpdateResponse> allResponses,
+ ImmutableListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMultimap) {
this.entityResponses = allResponses;
this.statusMultimap = statusMultimap;
}
@@ -84,25 +87,25 @@
}
@Override
- public Collection<WriteEntityResponse> all() {
+ public Collection<EntityUpdateResponse> all() {
return entityResponses;
}
@Override
- public Collection<WriteEntityResponse> success() {
- return statusMultimap.get(WriteResponseStatus.OK);
+ public Collection<EntityUpdateResponse> success() {
+ return statusMultimap.get(EntityUpdateStatus.OK);
}
@Override
- public Collection<WriteEntityResponse> failed() {
+ public Collection<EntityUpdateResponse> failed() {
return isSuccess()
? Collections.emptyList()
: entityResponses.stream().filter(r -> !r.isSuccess()).collect(toList());
}
@Override
- public Collection<WriteEntityResponse> status(
- WriteResponseStatus status) {
+ public Collection<EntityUpdateResponse> status(
+ EntityUpdateStatus status) {
checkNotNull(status);
return statusMultimap.get(status);
}
@@ -123,11 +126,11 @@
static final class Builder {
private final DeviceId deviceId;
- private final Map<Integer, WriteEntityResponseImpl> pendingResponses =
+ private final Map<Integer, EntityUpdateResponseImpl> pendingResponses =
Maps.newHashMap();
- private final List<WriteEntityResponse> allResponses =
+ private final List<EntityUpdateResponse> allResponses =
Lists.newArrayList();
- private final ListMultimap<WriteResponseStatus, WriteEntityResponse> statusMap =
+ private final ListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMap =
ArrayListMultimap.create();
private Builder(DeviceId deviceId) {
@@ -136,7 +139,7 @@
void addPendingResponse(PiHandle handle, PiEntity entity, UpdateType updateType) {
synchronized (this) {
- final WriteEntityResponseImpl resp = new WriteEntityResponseImpl(
+ final EntityUpdateResponseImpl resp = new EntityUpdateResponseImpl(
handle, entity, updateType);
allResponses.add(resp);
pendingResponses.put(pendingResponses.size(), resp);
@@ -144,21 +147,28 @@
}
void addFailedResponse(PiHandle handle, PiEntity entity, UpdateType updateType,
- String explanation, WriteResponseStatus status) {
+ String explanation, EntityUpdateStatus status) {
synchronized (this) {
- final WriteEntityResponseImpl resp = new WriteEntityResponseImpl(
+ final EntityUpdateResponseImpl resp = new EntityUpdateResponseImpl(
handle, entity, updateType)
.withFailure(explanation, status);
allResponses.add(resp);
}
}
+ Collection<EntityUpdateRequest> pendingUpdates() {
+ return ImmutableList.copyOf(pendingResponses.values());
+ }
+
WriteResponseImpl buildAsIs() {
synchronized (this) {
- if (!pendingResponses.isEmpty()) {
+ final long pendingCount = pendingResponses.values().stream()
+ .filter(r -> r.status() == EntityUpdateStatus.PENDING)
+ .count();
+ if (pendingCount > 0) {
log.warn("Partial response from {}, {} of {} total " +
- "entities are in status PENDING",
- deviceId, pendingResponses.size(), allResponses.size());
+ "updates are still in status PENDING",
+ deviceId, pendingCount, allResponses.size());
}
return new WriteResponseImpl(
ImmutableList.copyOf(allResponses),
@@ -169,7 +179,6 @@
WriteResponseImpl setSuccessAllAndBuild() {
synchronized (this) {
pendingResponses.values().forEach(this::doSetSuccess);
- pendingResponses.clear();
return buildAsIs();
}
}
@@ -177,7 +186,6 @@
WriteResponseImpl setFailAllAndBuild(Throwable throwable) {
synchronized (this) {
pendingResponses.values().forEach(r -> r.setFailure(throwable));
- pendingResponses.clear();
return buildAsIs();
}
}
@@ -190,8 +198,8 @@
private void setSuccess(int index) {
synchronized (this) {
- final WriteEntityResponseImpl resp = pendingResponses.remove(index);
- if (resp != null) {
+ final EntityUpdateResponseImpl resp = pendingResponses.get(index);
+ if (resp != null && resp.status == EntityUpdateStatus.PENDING) {
doSetSuccess(resp);
} else {
log.error("Missing pending response at index {}", index);
@@ -199,16 +207,16 @@
}
}
- private void doSetSuccess(WriteEntityResponseImpl resp) {
+ private void doSetSuccess(EntityUpdateResponseImpl resp) {
resp.setSuccess();
- statusMap.put(WriteResponseStatus.OK, resp);
+ statusMap.put(EntityUpdateStatus.OK, resp);
}
private void setFailure(int index,
String explanation,
- WriteResponseStatus status) {
+ EntityUpdateStatus status) {
synchronized (this) {
- final WriteEntityResponseImpl resp = pendingResponses.remove(index);
+ final EntityUpdateResponseImpl resp = pendingResponses.get(index);
if (resp != null) {
resp.withFailure(explanation, status);
statusMap.put(status, resp);
@@ -274,7 +282,7 @@
"P4Runtime Error message format not recognized [%s]",
TextFormat.shortDebugString(any));
if (reconcilable) {
- setFailure(index, unpackErr, WriteResponseStatus.OTHER_ERROR);
+ setFailure(index, unpackErr, EntityUpdateStatus.OTHER_ERROR);
} else {
log.warn(unpackErr);
}
@@ -283,7 +291,7 @@
// Map gRPC status codes to our WriteResponseStatus codes.
final Status.Code p4Code = Status.fromCodeValue(
p4Error.getCanonicalCode()).getCode();
- final WriteResponseStatus ourCode;
+ final EntityUpdateStatus ourCode;
switch (p4Code) {
case OK:
if (reconcilable) {
@@ -291,17 +299,17 @@
}
return;
case NOT_FOUND:
- ourCode = WriteResponseStatus.NOT_FOUND;
+ ourCode = EntityUpdateStatus.NOT_FOUND;
break;
case ALREADY_EXISTS:
- ourCode = WriteResponseStatus.ALREADY_EXIST;
+ ourCode = EntityUpdateStatus.ALREADY_EXIST;
break;
default:
- ourCode = WriteResponseStatus.OTHER_ERROR;
+ ourCode = EntityUpdateStatus.OTHER_ERROR;
break;
}
// Put the p4Code in the explanation only if ourCode is OTHER_ERROR.
- final String explanationCode = ourCode == WriteResponseStatus.OTHER_ERROR
+ final String explanationCode = ourCode == EntityUpdateStatus.OTHER_ERROR
? p4Code.name() + " " : "";
final String details = p4Error.hasDetails()
? ", " + p4Error.getDetails().toString() : "";
@@ -317,42 +325,50 @@
}
/**
- * Internal implementation of WriteEntityResponse.
+ * Internal implementation of EntityUpdateResponse.
*/
- private static final class WriteEntityResponseImpl implements WriteEntityResponse {
+ private static final class EntityUpdateResponseImpl implements EntityUpdateResponse {
private final PiHandle handle;
private final PiEntity entity;
private final UpdateType updateType;
- private WriteResponseStatus status = WriteResponseStatus.PENDING;
+ private EntityUpdateStatus status = EntityUpdateStatus.PENDING;
private String explanation;
private Throwable throwable;
- private WriteEntityResponseImpl(PiHandle handle, PiEntity entity, UpdateType updateType) {
+ private EntityUpdateResponseImpl(PiHandle handle, PiEntity entity, UpdateType updateType) {
this.handle = handle;
this.entity = entity;
this.updateType = updateType;
}
- private WriteEntityResponseImpl withFailure(
- String explanation, WriteResponseStatus status) {
- this.status = status;
+ private EntityUpdateResponseImpl withFailure(
+ String explanation, EntityUpdateStatus status) {
+ setStatus(status);
this.explanation = explanation;
this.throwable = null;
return this;
}
private void setSuccess() {
- this.status = WriteResponseStatus.OK;
+ setStatus(EntityUpdateStatus.OK);
}
private void setFailure(Throwable throwable) {
- this.status = WriteResponseStatus.OTHER_ERROR;
+ setStatus(EntityUpdateStatus.OTHER_ERROR);
this.explanation = throwable.toString();
this.throwable = throwable;
}
+ private void setStatus(EntityUpdateStatus newStatus) {
+ checkState(this.status == EntityUpdateStatus.PENDING,
+ "Cannot set status for non-pending update");
+ checkArgument(newStatus != EntityUpdateStatus.PENDING,
+ "newStatus must be different than pending");
+ this.status = newStatus;
+ }
+
@Override
public PiHandle handle() {
return handle;
@@ -375,11 +391,11 @@
@Override
public boolean isSuccess() {
- return status().equals(WriteResponseStatus.OK);
+ return status().equals(EntityUpdateStatus.OK);
}
@Override
- public WriteResponseStatus status() {
+ public EntityUpdateStatus status() {
return status;
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
index 7946394..5110274 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
@@ -91,8 +91,18 @@
.getPreamble().getName()))
.withId(PiActionProfileGroupId.of(msg.getGroupId()))
.withMaxSize(msg.getMaxSize());
- msg.getMembersList().forEach(m -> piGroupBuilder.addMember(
- PiActionProfileMemberId.of(m.getMemberId()), m.getWeight()));
+ msg.getMembersList().forEach(m -> {
+ final int weight;
+ if (m.getWeight() < 1) {
+ log.warn("Decoding group with invalid weight '{}', will set to 1",
+ m.getWeight());
+ weight = 1;
+ } else {
+ weight = m.getWeight();
+ }
+ piGroupBuilder.addMember(
+ PiActionProfileMemberId.of(m.getMemberId()), weight);
+ });
return piGroupBuilder.build();
}
}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 481a310..35a64b1 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -17,6 +17,7 @@
package org.onosproject.provider.general.device.impl;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
import org.onlab.packet.ChassisId;
import org.onlab.util.ItemNotFoundException;
import org.onlab.util.Tools;
@@ -79,21 +80,18 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.OP_TIMEOUT_SHORT;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.OP_TIMEOUT_SHORT_DEFAULT;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL_DEFAULT;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL;
@@ -110,7 +108,6 @@
property = {
PROBE_INTERVAL + ":Integer=" + PROBE_INTERVAL_DEFAULT,
STATS_POLL_INTERVAL + ":Integer=" + STATS_POLL_INTERVAL_DEFAULT,
- OP_TIMEOUT_SHORT + ":Integer=" + OP_TIMEOUT_SHORT_DEFAULT,
})
public class GeneralDeviceProvider extends AbstractProvider
implements DeviceProvider {
@@ -164,15 +161,16 @@
private GnmiDeviceStateSubscriber gnmiDeviceStateSubscriber;
- /** Configure interval for checking device availability; default is 10 sec. */
+ /**
+ * Configure interval for checking device availability; default is 10 sec.
+ */
private int probeInterval = PROBE_INTERVAL_DEFAULT;
- /** Configure poll frequency for port status and stats; default is 10 sec. */
+ /**
+ * Configure poll frequency for port status and stats; default is 10 sec.
+ */
private int statsPollInterval = STATS_POLL_INTERVAL_DEFAULT;
- /** Configure timeout in seconds for device operations; default is 10 sec. */
- private int opTimeoutShort = OP_TIMEOUT_SHORT_DEFAULT;
-
private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap();
private final Map<DeviceId, Long> lastProbedAvailability = Maps.newConcurrentMap();
private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener();
@@ -233,10 +231,6 @@
properties, STATS_POLL_INTERVAL, STATS_POLL_INTERVAL_DEFAULT);
log.info("Configured. {} is configured to {} seconds",
STATS_POLL_INTERVAL, statsPollInterval);
- opTimeoutShort = Tools.getIntegerProperty(
- properties, OP_TIMEOUT_SHORT, OP_TIMEOUT_SHORT_DEFAULT);
- log.info("Configured. {} is configured to {} seconds",
- OP_TIMEOUT_SHORT, opTimeoutShort);
if (oldProbeFrequency != probeInterval) {
startOrRescheduleProbeTask();
@@ -391,9 +385,16 @@
final CompletableFuture<Boolean> modifyTask = enable
? portAdmin.enable(portNumber)
: portAdmin.disable(portNumber);
- final String descr = (enable ? "enabling" : "disabling") + " port " + portNumber;
- getFutureWithDeadline(
- modifyTask, descr, deviceId, null, opTimeoutShort);
+ final String descr = format("%s port %s on %s",
+ (enable ? "enable" : "disable"),
+ portNumber, deviceId);
+ modifyTask.whenComplete((success, ex) -> {
+ if (ex != null) {
+ log.error("Exception while trying to " + descr, ex);
+ } else if (!success) {
+ log.warn("Unable to " + descr);
+ }
+ });
}
@Override
@@ -637,10 +638,7 @@
handshaker.addDeviceAgentListener(id(), deviceAgentListener);
handshakersWithListeners.put(deviceId, handshaker);
// Start connection via handshaker.
- final Boolean connectSuccess = getFutureWithDeadline(
- handshaker.connect(), "initiating connection",
- deviceId, false, opTimeoutShort);
- if (!connectSuccess) {
+ if (!Futures.getUnchecked(handshaker.connect())) {
// Failed! Remove listeners.
handshaker.removeDeviceAgentListener(id());
handshakersWithListeners.remove(deviceId);
@@ -681,16 +679,12 @@
private boolean probeAvailability(DeviceHandshaker handshaker) {
lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis());
- return getFutureWithDeadline(
- handshaker.probeAvailability(), "probing availability",
- handshaker.data().deviceId(), false, opTimeoutShort);
+ return Futures.getUnchecked(handshaker.probeAvailability());
}
private boolean probeReachability(DeviceHandshaker handshaker) {
lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis());
- return getFutureWithDeadline(
- handshaker.probeReachability(), "probing reachability",
- handshaker.data().deviceId(), false, opTimeoutShort);
+ return Futures.getUnchecked(handshaker.probeReachability());
}
private void markOfflineIfNeeded(DeviceId deviceId) {
@@ -928,19 +922,4 @@
private boolean isPipelineProgrammable(DeviceId deviceId) {
return hasBehaviour(deviceId, PiPipelineProgrammable.class);
}
-
- private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
- DeviceId deviceId, U defaultValue, int timeout) {
- try {
- return future.get(timeout, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.error("Thread interrupted while {} on {}", opDescription, deviceId);
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
- } catch (TimeoutException e) {
- log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
- }
- return defaultValue;
- }
}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
index 358ce5f..1f2dc09 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
@@ -29,7 +29,4 @@
public static final String PROBE_INTERVAL = "deviceProbeInterval";
public static final int PROBE_INTERVAL_DEFAULT = 10;
- public static final String OP_TIMEOUT_SHORT = "deviceOperationTimeoutShort";
- public static final int OP_TIMEOUT_SHORT_DEFAULT = 10;
-
}