blob: 7afd97ba085a711efb7df42bcac20c6ecead38dd [file] [log] [blame]
Carmelo Cascone4c289b72019-01-22 15:30:45 -08001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl.client;
18
19import com.google.protobuf.TextFormat;
20import io.grpc.Status;
21import io.grpc.StatusRuntimeException;
22import io.grpc.stub.ClientCallStreamObserver;
23import io.grpc.stub.StreamObserver;
24import org.onosproject.net.DeviceId;
25import org.onosproject.net.pi.model.PiPipeconf;
26import org.onosproject.net.pi.runtime.PiPacketOperation;
27import org.onosproject.net.pi.service.PiPipeconfService;
28import org.onosproject.p4runtime.api.P4RuntimeEvent;
29import org.onosproject.p4runtime.api.P4RuntimeStreamClient;
30import org.onosproject.p4runtime.ctl.codec.CodecException;
31import org.onosproject.p4runtime.ctl.controller.ArbitrationUpdateEvent;
32import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
33import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
34import org.onosproject.p4runtime.ctl.controller.PacketInEvent;
35import org.slf4j.Logger;
36import p4.v1.P4RuntimeOuterClass;
37import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
38import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
39
40import java.math.BigInteger;
41import java.net.ConnectException;
42import java.nio.ByteBuffer;
43import java.util.concurrent.atomic.AtomicBoolean;
44
45import static java.lang.String.format;
46import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
47import static org.slf4j.LoggerFactory.getLogger;
48
49/**
50 * Implementation of P4RuntimeStreamClient. Handles P4Runtime StreamChannel RPC
51 * operations, such as arbitration update and packet-in/out.
52 */
53public final class StreamClientImpl implements P4RuntimeStreamClient {
54
55 private static final Logger log = getLogger(StreamClientImpl.class);
56
57 private static final BigInteger ONE_THOUSAND = BigInteger.valueOf(1000);
58
59 private final P4RuntimeClientImpl client;
60 private final DeviceId deviceId;
61 private final long p4DeviceId;
62 private final PiPipeconfService pipeconfService;
63 private final P4RuntimeControllerImpl controller;
64 private final StreamChannelManager streamChannelManager = new StreamChannelManager();
65
66 private P4RuntimeOuterClass.Uint128 lastUsedElectionId = P4RuntimeOuterClass.Uint128
67 .newBuilder().setLow(1).build();
68
69 private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
70
71 StreamClientImpl(
72 PiPipeconfService pipeconfService,
73 P4RuntimeClientImpl client,
74 P4RuntimeControllerImpl controller) {
75 this.client = client;
76 this.deviceId = client.deviceId();
77 this.p4DeviceId = client.p4DeviceId();
78 this.pipeconfService = pipeconfService;
79 this.controller = controller;
80 }
81
82 @Override
83 public void openSession() {
84 if (isSessionOpen()) {
85 log.debug("Dropping request to open session for {}, session is already open",
86 deviceId);
87 return;
88 }
89 log.debug("Opening session for {}...", deviceId);
90 sendMasterArbitrationUpdate(controller.newMasterElectionId(deviceId));
91
92 }
93
94 @Override
95 public boolean isSessionOpen() {
96 return streamChannelManager.isOpen();
97 }
98
99 @Override
100 public void closeSession() {
101 streamChannelManager.complete();
102 }
103
104 @Override
105 public void runForMastership() {
106 if (!isSessionOpen()) {
107 log.debug("Dropping mastership request for {}, session is closed",
108 deviceId);
109 return;
110 }
111 // Becoming master is a race. Here we increase our chances of win, i.e.
112 // using the highest election ID, against other ONOS nodes in the
113 // cluster that are calling openSession() (which is used to start the
114 // stream RPC session, not to become master).
115 log.debug("Running for mastership on {}...", deviceId);
116 final BigInteger masterId = controller.newMasterElectionId(deviceId)
117 .add(ONE_THOUSAND);
118 sendMasterArbitrationUpdate(masterId);
119 }
120
121 @Override
122 public boolean isMaster() {
123 return streamChannelManager.isOpen() && isClientMaster.get();
124 }
125
126 @Override
127 public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
128 if (!isSessionOpen()) {
129 log.debug("Dropping packet-out request for {}, session is closed",
130 deviceId);
131 return;
132 }
133 if (log.isTraceEnabled()) {
134 log.trace("Sending packet-out to {}: {}", deviceId, packet);
135 }
136 try {
137 // Encode the PiPacketOperation into a PacketOut
138 final P4RuntimeOuterClass.PacketOut packetOut =
139 CODECS.packetOut().encode(packet, null, pipeconf);
140 // Build the request
141 final StreamMessageRequest packetOutRequest = StreamMessageRequest
142 .newBuilder().setPacket(packetOut).build();
143 // Send.
144 streamChannelManager.sendIfOpen(packetOutRequest);
145 } catch (CodecException e) {
146 log.error("Unable to send packet-out: {}", e.getMessage());
147 }
148 }
149
150 private void sendMasterArbitrationUpdate(BigInteger electionId) {
151 log.debug("Sending arbitration update to {}... electionId={}",
152 deviceId, electionId);
153 final P4RuntimeOuterClass.Uint128 idMsg = bigIntegerToUint128(electionId);
154 streamChannelManager.send(
155 StreamMessageRequest.newBuilder()
156 .setArbitration(
157 P4RuntimeOuterClass.MasterArbitrationUpdate
158 .newBuilder()
159 .setDeviceId(p4DeviceId)
160 .setElectionId(idMsg)
161 .build())
162 .build());
163 lastUsedElectionId = idMsg;
164 }
165
166 private P4RuntimeOuterClass.Uint128 bigIntegerToUint128(BigInteger value) {
167 final byte[] arr = value.toByteArray();
168 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
169 .put(new byte[Long.BYTES * 2 - arr.length])
170 .put(arr);
171 bb.rewind();
172 return P4RuntimeOuterClass.Uint128.newBuilder()
173 .setHigh(bb.getLong())
174 .setLow(bb.getLong())
175 .build();
176 }
177
178 private BigInteger uint128ToBigInteger(P4RuntimeOuterClass.Uint128 value) {
179 return new BigInteger(
180 ByteBuffer.allocate(Long.BYTES * 2)
181 .putLong(value.getHigh())
182 .putLong(value.getLow())
183 .array());
184 }
185
186 private void handlePacketIn(P4RuntimeOuterClass.PacketIn packetInMsg) {
187 if (log.isTraceEnabled()) {
188 log.trace("Received packet-in from {}: {}", deviceId, packetInMsg);
189 }
190 if (!pipeconfService.getPipeconf(deviceId).isPresent()) {
191 log.warn("Unable to handle packet-in from {}, missing pipeconf: {}",
192 deviceId, TextFormat.shortDebugString(packetInMsg));
193 return;
194 }
195 // Decode packet message and post event.
196 // TODO: consider implementing a cache to speed up
197 // encoding/deconding of packet-in/out (e.g. LLDP, ARP)
198 final PiPipeconf pipeconf = pipeconfService.getPipeconf(deviceId).get();
199 final PiPacketOperation pktOperation;
200 try {
201 pktOperation = CODECS.packetIn().decode(
202 packetInMsg, null, pipeconf);
203 } catch (CodecException e) {
204 log.warn("Unable to process packet-int: {}", e.getMessage());
205 return;
206 }
207 controller.postEvent(new P4RuntimeEvent(
208 P4RuntimeEvent.Type.PACKET_IN,
209 new PacketInEvent(deviceId, pktOperation)));
210 }
211
212 private void handleArbitrationUpdate(P4RuntimeOuterClass.MasterArbitrationUpdate msg) {
213 // From the spec...
214 // - Election_id: The stream RPC with the highest election_id is the
215 // master. Switch populates with the highest election ID it
216 // has received from all connected controllers.
217 // - Status: Switch populates this with OK for the client that is the
218 // master, and with an error status for all other connected clients (at
219 // every mastership change).
220 if (!msg.hasElectionId() || !msg.hasStatus()) {
221 return;
222 }
223 final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
224 log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
225 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
226 controller.postEvent(new P4RuntimeEvent(
227 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
228 new ArbitrationUpdateEvent(deviceId, isMaster)));
229 isClientMaster.set(isMaster);
230 }
231
232 /**
233 * Returns the election ID last used in a MasterArbitrationUpdate message
234 * sent by the client to the server.
235 *
236 * @return election ID uint128 protobuf message
237 */
238 P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
239 return lastUsedElectionId;
240 }
241
242 /**
243 * A manager for the P4Runtime stream channel that opportunistically creates
244 * new stream RCP stubs (e.g. when one fails because of errors) and posts
245 * channel events via the P4Runtime controller.
246 */
247 private final class StreamChannelManager {
248
249 private final AtomicBoolean open = new AtomicBoolean(false);
250 private final StreamObserver<StreamMessageResponse> responseObserver =
251 new InternalStreamResponseObserver(this);
252 private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
253
254 void send(StreamMessageRequest value) {
255 synchronized (this) {
256 initIfRequired();
257 doSend(value);
258 }
259 }
260
261 void sendIfOpen(StreamMessageRequest value) {
262 // We do not lock here, but we ignore NPEs due to stream RPC not
263 // being active (null requestObserver). Good for frequent
264 // packet-outs.
265 try {
266 doSend(value);
267 } catch (NullPointerException e) {
268 if (requestObserver != null) {
269 // Must be something else.
270 throw e;
271 }
272 }
273 }
274
275 private void doSend(StreamMessageRequest value) {
276 try {
277 requestObserver.onNext(value);
278 } catch (Throwable ex) {
279 if (ex instanceof StatusRuntimeException) {
280 log.warn("Unable to send {} to {}: {}",
281 value.getUpdateCase().toString(), deviceId, ex.getMessage());
282 } else {
283 log.error("Exception while sending {} to {}: {}",
284 value.getUpdateCase().toString(), deviceId, ex);
285 }
286 complete();
287 }
288 }
289
290 private void initIfRequired() {
291 if (requestObserver == null) {
292 log.debug("Creating new stream channel for {}...", deviceId);
293 open.set(false);
294 client.execRpcNoTimeout(
295 s -> requestObserver =
296 (ClientCallStreamObserver<StreamMessageRequest>)
297 s.streamChannel(responseObserver)
298 );
299 }
300 }
301
302 void complete() {
303 synchronized (this) {
304 signalClosed();
305 if (requestObserver != null) {
306 requestObserver.onCompleted();
307 requestObserver.cancel("Completed", null);
308 requestObserver = null;
309 }
310 }
311 }
312
313 void signalOpen() {
314 synchronized (this) {
315 final boolean wasOpen = open.getAndSet(true);
316 if (!wasOpen) {
317 controller.postEvent(new P4RuntimeEvent(
318 P4RuntimeEvent.Type.CHANNEL_EVENT,
319 new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
320 }
321 }
322 }
323
324 void signalClosed() {
325 synchronized (this) {
326 final boolean wasOpen = open.getAndSet(false);
327 if (wasOpen) {
328 controller.postEvent(new P4RuntimeEvent(
329 P4RuntimeEvent.Type.CHANNEL_EVENT,
330 new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
331 }
332 }
333 }
334
335 boolean isOpen() {
336 return open.get();
337 }
338 }
339
340 /**
341 * Handles messages received from the device on the stream channel.
342 */
343 private final class InternalStreamResponseObserver
344 implements StreamObserver<StreamMessageResponse> {
345
346 private final StreamChannelManager streamChannelManager;
347
348 private InternalStreamResponseObserver(
349 StreamChannelManager streamChannelManager) {
350 this.streamChannelManager = streamChannelManager;
351 }
352
353 @Override
354 public void onNext(StreamMessageResponse message) {
355 streamChannelManager.signalOpen();
356 try {
357 if (log.isTraceEnabled()) {
358 log.trace(
359 "Received {} from {}: {}",
360 message.getUpdateCase(), deviceId,
361 TextFormat.shortDebugString(message));
362 }
363 switch (message.getUpdateCase()) {
364 case PACKET:
365 handlePacketIn(message.getPacket());
366 return;
367 case ARBITRATION:
368 handleArbitrationUpdate(message.getArbitration());
369 return;
370 default:
371 log.warn("Unrecognized StreamMessageResponse from {}: {}",
372 deviceId, message.getUpdateCase());
373 }
374 } catch (Throwable ex) {
375 log.error("Exception while processing stream message from {}",
376 deviceId, ex);
377 }
378 }
379
380 @Override
381 public void onError(Throwable throwable) {
382 if (throwable instanceof StatusRuntimeException) {
383 final StatusRuntimeException sre = (StatusRuntimeException) throwable;
384 if (sre.getStatus().getCause() instanceof ConnectException) {
385 log.warn("{} is unreachable ({})",
386 deviceId, sre.getCause().getMessage());
387 } else {
388 log.warn("Error on stream channel for {}: {}",
389 deviceId, throwable.getMessage());
390 }
391 } else {
392 log.error(format("Exception on stream channel for %s",
393 deviceId), throwable);
394 }
395 streamChannelManager.complete();
396 }
397
398 @Override
399 public void onCompleted() {
400 log.warn("Stream channel for {} has completed", deviceId);
401 streamChannelManager.complete();
402 }
403 }
404}