blob: 353d44e0d43f6bda04db52a7de12ef82e8a7d1e7 [file] [log] [blame]
Carmelo Cascone4c289b72019-01-22 15:30:45 -08001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl.client;
18
19import io.grpc.ManagedChannel;
20import io.grpc.StatusRuntimeException;
21import org.onosproject.grpc.ctl.AbstractGrpcClient;
22import org.onosproject.net.DeviceId;
23import org.onosproject.net.pi.model.PiPipeconf;
24import org.onosproject.net.pi.runtime.PiPacketOperation;
25import org.onosproject.net.pi.service.PiPipeconfService;
26import org.onosproject.p4runtime.api.P4RuntimeClient;
27import org.onosproject.p4runtime.api.P4RuntimeClientKey;
28import org.onosproject.p4runtime.api.P4RuntimeEvent;
29import org.onosproject.p4runtime.ctl.controller.BaseEventSubject;
30import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
31import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
32import p4.v1.P4RuntimeGrpc;
33import p4.v1.P4RuntimeOuterClass;
34
35import java.nio.ByteBuffer;
36import java.util.concurrent.CompletableFuture;
37import java.util.concurrent.TimeUnit;
38import java.util.function.Consumer;
39
40import static com.google.common.base.Preconditions.checkNotNull;
41import static java.lang.String.format;
42
43/**
44 * Implementation of P4RuntimeClient.
45 */
46public final class P4RuntimeClientImpl
47 extends AbstractGrpcClient implements P4RuntimeClient {
48
49 // TODO: consider making timeouts configurable per-device via netcfg
50 /**
51 * Timeout in seconds for short/fast RPCs.
52 */
53 static final int SHORT_TIMEOUT_SECONDS = 10;
54 /**
55 * Timeout in seconds for RPCs that involve transfer of potentially large
56 * amount of data. This shoulld be long enough to allow for network delay
57 * (e.g. to transfer large pipeline binaries over slow network).
58 */
59 static final int LONG_TIMEOUT_SECONDS = 60;
60
61 private final long p4DeviceId;
62 private final ManagedChannel channel;
63 private final P4RuntimeControllerImpl controller;
64 private final StreamClientImpl streamClient;
65 private final PipelineConfigClientImpl pipelineConfigClient;
66
67 /**
68 * Instantiates a new client with the given arguments.
69 *
70 * @param clientKey client key
71 * @param channel gRPC managed channel
72 * @param controller P$Runtime controller instance
73 * @param pipeconfService pipeconf service instance
74 */
75 public P4RuntimeClientImpl(P4RuntimeClientKey clientKey,
76 ManagedChannel channel,
77 P4RuntimeControllerImpl controller,
78 PiPipeconfService pipeconfService) {
79 super(clientKey);
80 checkNotNull(channel);
81 checkNotNull(controller);
82 checkNotNull(pipeconfService);
83
84 this.p4DeviceId = clientKey.p4DeviceId();
85 this.channel = channel;
86 this.controller = controller;
87 this.streamClient = new StreamClientImpl(
88 pipeconfService, this, controller);
89 this.pipelineConfigClient = new PipelineConfigClientImpl(this);
90 }
91
92 @Override
93 protected Void doShutdown() {
94 streamClient.closeSession();
95 return super.doShutdown();
96 }
97
98 @Override
99 public CompletableFuture<Boolean> setPipelineConfig(
100 PiPipeconf pipeconf, ByteBuffer deviceData) {
101 return pipelineConfigClient.setPipelineConfig(pipeconf, deviceData);
102 }
103
104 @Override
105 public CompletableFuture<Boolean> isPipelineConfigSet(
106 PiPipeconf pipeconf, ByteBuffer deviceData) {
107 return pipelineConfigClient.isPipelineConfigSet(pipeconf, deviceData);
108 }
109
110 @Override
111 public ReadRequest read(PiPipeconf pipeconf) {
112 return new ReadRequestImpl(this, pipeconf);
113 }
114
115 @Override
116 public void openSession() {
117 streamClient.openSession();
118 }
119
120 @Override
121 public boolean isSessionOpen() {
122 return streamClient.isSessionOpen();
123 }
124
125 @Override
126 public void closeSession() {
127 streamClient.closeSession();
128 }
129
130 @Override
131 public void runForMastership() {
132 streamClient.runForMastership();
133 }
134
135 @Override
136 public boolean isMaster() {
137 return streamClient.isMaster();
138 }
139
140 @Override
141 public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
142 streamClient.packetOut(packet, pipeconf);
143 }
144
145 @Override
146 public WriteRequest write(PiPipeconf pipeconf) {
147 return new WriteRequestImpl(this, pipeconf);
148 }
149
150 /**
151 * Returns the P4Runtime-internal device ID associated with this client.
152 *
153 * @return P4Runtime-internal device ID
154 */
155 long p4DeviceId() {
156 return this.p4DeviceId;
157 }
158
159 /**
160 * Returns the ONOS device ID associated with this client.
161 *
162 * @return ONOS device ID
163 */
164 DeviceId deviceId() {
165 return this.deviceId;
166 }
167
168 /**
169 * Returns the election ID last used in a MasterArbitrationUpdate message
170 * sent by the client to the server. No guarantees are given that this is
171 * the current election ID associated to the session, nor that the server
172 * has acknowledged this value as valid.
173 *
174 * @return election ID uint128 protobuf message
175 */
176 P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
177 return streamClient.lastUsedElectionId();
178 }
179
180 /**
181 * Forces execution of an RPC in a cancellable context with the given
182 * timeout (in seconds).
183 *
184 * @param stubConsumer P4Runtime stub consumer
185 * @param timeout timeout in seconds
186 */
187 void execRpc(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer, int timeout) {
188 if (log.isTraceEnabled()) {
189 log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
190 timeout, context().getDeadline());
191 }
192 runInCancellableContext(() -> stubConsumer.accept(
193 P4RuntimeGrpc.newStub(channel)
194 .withDeadlineAfter(timeout, TimeUnit.SECONDS)));
195 }
196
197 /**
198 * Forces execution of an RPC in a cancellable context with no timeout.
199 *
200 * @param stubConsumer P4Runtime stub consumer
201 */
202 void execRpcNoTimeout(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer) {
203 if (log.isTraceEnabled()) {
204 log.trace("Executing RPC with no timeout (context deadline {})...",
205 context().getDeadline());
206 }
207 runInCancellableContext(() -> stubConsumer.accept(
208 P4RuntimeGrpc.newStub(channel)));
209 }
210
211 /**
212 * Logs the error and checks it for any condition that might be of interest
213 * for the controller.
214 *
215 * @param throwable throwable
216 * @param opDescription operation description for logging
217 */
218 void handleRpcError(Throwable throwable, String opDescription) {
219 if (throwable instanceof StatusRuntimeException) {
220 final StatusRuntimeException sre = (StatusRuntimeException) throwable;
221 checkGrpcException(sre);
222 final String logMsg;
223 if (sre.getCause() == null) {
224 logMsg = sre.getMessage();
225 } else {
226 logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
227 }
228 log.warn("Error while performing {} on {}: {}",
229 opDescription, deviceId, logMsg);
230 log.debug("", throwable);
231 return;
232 }
233 log.error(format("Exception while performing %s on %s",
234 opDescription, deviceId), throwable);
235 }
236
237 private void checkGrpcException(StatusRuntimeException sre) {
238 switch (sre.getStatus().getCode()) {
239 case PERMISSION_DENIED:
240 // Notify upper layers that this node is not master.
241 controller.postEvent(new P4RuntimeEvent(
242 P4RuntimeEvent.Type.PERMISSION_DENIED,
243 new BaseEventSubject(deviceId)));
244 break;
245 case UNAVAILABLE:
246 // Channel might be closed.
247 controller.postEvent(new P4RuntimeEvent(
248 P4RuntimeEvent.Type.CHANNEL_EVENT,
249 new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
250 break;
251 default:
252 break;
253 }
254 }
255}