blob: 353d44e0d43f6bda04db52a7de12ef82e8a7d1e7 [file] [log] [blame]
/*
* Copyright 2019-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.p4runtime.ctl.client;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.runtime.PiPacketOperation;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.ctl.controller.BaseEventSubject;
import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
import p4.v1.P4RuntimeGrpc;
import p4.v1.P4RuntimeOuterClass;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
/**
* Implementation of P4RuntimeClient.
*/
public final class P4RuntimeClientImpl
extends AbstractGrpcClient implements P4RuntimeClient {
// TODO: consider making timeouts configurable per-device via netcfg
/**
* Timeout in seconds for short/fast RPCs.
*/
static final int SHORT_TIMEOUT_SECONDS = 10;
/**
* Timeout in seconds for RPCs that involve transfer of potentially large
* amount of data. This shoulld be long enough to allow for network delay
* (e.g. to transfer large pipeline binaries over slow network).
*/
static final int LONG_TIMEOUT_SECONDS = 60;
private final long p4DeviceId;
private final ManagedChannel channel;
private final P4RuntimeControllerImpl controller;
private final StreamClientImpl streamClient;
private final PipelineConfigClientImpl pipelineConfigClient;
/**
* Instantiates a new client with the given arguments.
*
* @param clientKey client key
* @param channel gRPC managed channel
* @param controller P$Runtime controller instance
* @param pipeconfService pipeconf service instance
*/
public P4RuntimeClientImpl(P4RuntimeClientKey clientKey,
ManagedChannel channel,
P4RuntimeControllerImpl controller,
PiPipeconfService pipeconfService) {
super(clientKey);
checkNotNull(channel);
checkNotNull(controller);
checkNotNull(pipeconfService);
this.p4DeviceId = clientKey.p4DeviceId();
this.channel = channel;
this.controller = controller;
this.streamClient = new StreamClientImpl(
pipeconfService, this, controller);
this.pipelineConfigClient = new PipelineConfigClientImpl(this);
}
@Override
protected Void doShutdown() {
streamClient.closeSession();
return super.doShutdown();
}
@Override
public CompletableFuture<Boolean> setPipelineConfig(
PiPipeconf pipeconf, ByteBuffer deviceData) {
return pipelineConfigClient.setPipelineConfig(pipeconf, deviceData);
}
@Override
public CompletableFuture<Boolean> isPipelineConfigSet(
PiPipeconf pipeconf, ByteBuffer deviceData) {
return pipelineConfigClient.isPipelineConfigSet(pipeconf, deviceData);
}
@Override
public ReadRequest read(PiPipeconf pipeconf) {
return new ReadRequestImpl(this, pipeconf);
}
@Override
public void openSession() {
streamClient.openSession();
}
@Override
public boolean isSessionOpen() {
return streamClient.isSessionOpen();
}
@Override
public void closeSession() {
streamClient.closeSession();
}
@Override
public void runForMastership() {
streamClient.runForMastership();
}
@Override
public boolean isMaster() {
return streamClient.isMaster();
}
@Override
public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
streamClient.packetOut(packet, pipeconf);
}
@Override
public WriteRequest write(PiPipeconf pipeconf) {
return new WriteRequestImpl(this, pipeconf);
}
/**
* Returns the P4Runtime-internal device ID associated with this client.
*
* @return P4Runtime-internal device ID
*/
long p4DeviceId() {
return this.p4DeviceId;
}
/**
* Returns the ONOS device ID associated with this client.
*
* @return ONOS device ID
*/
DeviceId deviceId() {
return this.deviceId;
}
/**
* Returns the election ID last used in a MasterArbitrationUpdate message
* sent by the client to the server. No guarantees are given that this is
* the current election ID associated to the session, nor that the server
* has acknowledged this value as valid.
*
* @return election ID uint128 protobuf message
*/
P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
return streamClient.lastUsedElectionId();
}
/**
* Forces execution of an RPC in a cancellable context with the given
* timeout (in seconds).
*
* @param stubConsumer P4Runtime stub consumer
* @param timeout timeout in seconds
*/
void execRpc(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer, int timeout) {
if (log.isTraceEnabled()) {
log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
timeout, context().getDeadline());
}
runInCancellableContext(() -> stubConsumer.accept(
P4RuntimeGrpc.newStub(channel)
.withDeadlineAfter(timeout, TimeUnit.SECONDS)));
}
/**
* Forces execution of an RPC in a cancellable context with no timeout.
*
* @param stubConsumer P4Runtime stub consumer
*/
void execRpcNoTimeout(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer) {
if (log.isTraceEnabled()) {
log.trace("Executing RPC with no timeout (context deadline {})...",
context().getDeadline());
}
runInCancellableContext(() -> stubConsumer.accept(
P4RuntimeGrpc.newStub(channel)));
}
/**
* Logs the error and checks it for any condition that might be of interest
* for the controller.
*
* @param throwable throwable
* @param opDescription operation description for logging
*/
void handleRpcError(Throwable throwable, String opDescription) {
if (throwable instanceof StatusRuntimeException) {
final StatusRuntimeException sre = (StatusRuntimeException) throwable;
checkGrpcException(sre);
final String logMsg;
if (sre.getCause() == null) {
logMsg = sre.getMessage();
} else {
logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
}
log.warn("Error while performing {} on {}: {}",
opDescription, deviceId, logMsg);
log.debug("", throwable);
return;
}
log.error(format("Exception while performing %s on %s",
opDescription, deviceId), throwable);
}
private void checkGrpcException(StatusRuntimeException sre) {
switch (sre.getStatus().getCode()) {
case PERMISSION_DENIED:
// Notify upper layers that this node is not master.
controller.postEvent(new P4RuntimeEvent(
P4RuntimeEvent.Type.PERMISSION_DENIED,
new BaseEventSubject(deviceId)));
break;
case UNAVAILABLE:
// Channel might be closed.
controller.postEvent(new P4RuntimeEvent(
P4RuntimeEvent.Type.CHANNEL_EVENT,
new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
break;
default:
break;
}
}
}