blob: 5c64a0c99203f78420a6b4cbd16f39503f7b9240 [file] [log] [blame]
Carmelo Cascone4c289b72019-01-22 15:30:45 -08001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl.client;
18
19import com.google.protobuf.TextFormat;
20import io.grpc.Status;
21import io.grpc.StatusRuntimeException;
22import io.grpc.stub.ClientCallStreamObserver;
23import io.grpc.stub.StreamObserver;
Carmelo Casconeb8a25052019-04-08 15:22:32 -070024import org.onlab.util.SharedScheduledExecutors;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080025import org.onosproject.net.DeviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080026import org.onosproject.net.device.DeviceAgentEvent;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080027import org.onosproject.net.pi.model.PiPipeconf;
28import org.onosproject.net.pi.runtime.PiPacketOperation;
29import org.onosproject.net.pi.service.PiPipeconfService;
30import org.onosproject.p4runtime.api.P4RuntimeEvent;
31import org.onosproject.p4runtime.api.P4RuntimeStreamClient;
32import org.onosproject.p4runtime.ctl.codec.CodecException;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080033import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
34import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore.MasterElectionIdListener;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080035import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
36import org.onosproject.p4runtime.ctl.controller.PacketInEvent;
37import org.slf4j.Logger;
38import p4.v1.P4RuntimeOuterClass;
39import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
40import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
41
42import java.math.BigInteger;
43import java.net.ConnectException;
44import java.nio.ByteBuffer;
Carmelo Casconeb8a25052019-04-08 15:22:32 -070045import java.util.concurrent.ScheduledFuture;
46import java.util.concurrent.TimeUnit;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080047import java.util.concurrent.atomic.AtomicBoolean;
48
Carmelo Cascone3977ea42019-02-28 13:43:42 -080049import static com.google.common.base.Preconditions.checkArgument;
50import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080051import static java.lang.String.format;
Carmelo Casconeb8a25052019-04-08 15:22:32 -070052import static java.lang.System.currentTimeMillis;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080053import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
54import static org.slf4j.LoggerFactory.getLogger;
55
56/**
57 * Implementation of P4RuntimeStreamClient. Handles P4Runtime StreamChannel RPC
Carmelo Casconec2be50a2019-04-10 00:15:39 -070058 * operations, such as arbitration update and packet-in/out, for a given
59 * P4Runtime-internal device ID.
Carmelo Cascone4c289b72019-01-22 15:30:45 -080060 */
61public final class StreamClientImpl implements P4RuntimeStreamClient {
62
63 private static final Logger log = getLogger(StreamClientImpl.class);
64
Carmelo Casconeb8a25052019-04-08 15:22:32 -070065 private static final int ARBITRATION_RETRY_SECONDS = 3;
66 private static final int ARBITRATION_TIMEOUT_SECONDS = 15;
67
Carmelo Cascone4c289b72019-01-22 15:30:45 -080068 private final P4RuntimeClientImpl client;
69 private final DeviceId deviceId;
70 private final long p4DeviceId;
71 private final PiPipeconfService pipeconfService;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080072 private final MasterElectionIdStore masterElectionIdStore;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080073 private final P4RuntimeControllerImpl controller;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080074
Carmelo Cascone4c289b72019-01-22 15:30:45 -080075 private final StreamChannelManager streamChannelManager = new StreamChannelManager();
Carmelo Cascone3977ea42019-02-28 13:43:42 -080076 private final MasterElectionIdListener masterElectionIdListener = new InternalMasterElectionIdListener();
Carmelo Cascone4c289b72019-01-22 15:30:45 -080077
Carmelo Cascone3977ea42019-02-28 13:43:42 -080078 private final AtomicBoolean isMaster = new AtomicBoolean(false);
79 private final AtomicBoolean requestedToBeMaster = new AtomicBoolean(false);
Carmelo Cascone4c289b72019-01-22 15:30:45 -080080
Carmelo Cascone3977ea42019-02-28 13:43:42 -080081 private BigInteger pendingElectionId = null;
82 private BigInteger lastUsedElectionId = null;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080083
Carmelo Casconeb8a25052019-04-08 15:22:32 -070084 private ScheduledFuture<?> pendingElectionIdRetryTask = null;
85 private long pendingElectionIdTimestamp = 0;
86
Carmelo Cascone4c289b72019-01-22 15:30:45 -080087 StreamClientImpl(
88 PiPipeconfService pipeconfService,
Carmelo Cascone3977ea42019-02-28 13:43:42 -080089 MasterElectionIdStore masterElectionIdStore,
Carmelo Cascone4c289b72019-01-22 15:30:45 -080090 P4RuntimeClientImpl client,
Carmelo Casconec2be50a2019-04-10 00:15:39 -070091 long p4DeviceId,
Carmelo Cascone4c289b72019-01-22 15:30:45 -080092 P4RuntimeControllerImpl controller) {
93 this.client = client;
94 this.deviceId = client.deviceId();
Carmelo Casconec2be50a2019-04-10 00:15:39 -070095 this.p4DeviceId = p4DeviceId;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080096 this.pipeconfService = pipeconfService;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080097 this.masterElectionIdStore = masterElectionIdStore;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080098 this.controller = controller;
99 }
100
101 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700102 public boolean isSessionOpen(long p4DeviceId) {
103 checkArgument(this.p4DeviceId == p4DeviceId);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800104 return streamChannelManager.isOpen();
105 }
106
107 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700108 public void closeSession(long p4DeviceId) {
109 checkArgument(this.p4DeviceId == p4DeviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800110 synchronized (requestedToBeMaster) {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700111 this.masterElectionIdStore.unsetListener(deviceId, p4DeviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800112 streamChannelManager.teardown();
113 pendingElectionId = null;
114 requestedToBeMaster.set(false);
115 isMaster.set(false);
116 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800117 }
118
119 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700120 public void setMastership(long p4DeviceId, boolean master,
121 BigInteger newElectionId) {
122 checkArgument(this.p4DeviceId == p4DeviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800123 checkNotNull(newElectionId);
124 checkArgument(newElectionId.compareTo(BigInteger.ZERO) > 0,
125 "newElectionId must be a non zero positive number");
126 synchronized (requestedToBeMaster) {
127 requestedToBeMaster.set(master);
128 pendingElectionId = newElectionId;
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700129 handlePendingElectionId(masterElectionIdStore.get(
130 deviceId, p4DeviceId));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800131 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800132 }
133
134 private void handlePendingElectionId(BigInteger masterElectionId) {
135 synchronized (requestedToBeMaster) {
136 if (pendingElectionId == null) {
137 // No pending requests.
138 return;
139 }
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700140 // Cancel any pending task. We'll reschedule if needed.
141 if (pendingElectionIdRetryTask != null) {
142 // Do not interrupt if running, as this might be executed by the
143 // pending task itself.
144 pendingElectionIdRetryTask.cancel(false);
145 pendingElectionIdRetryTask = null;
146 }
147 // We implement a deferring mechanism to avoid becoming master when
148 // we shouldn't, i.e. when the requested election ID is bigger than
149 // the master one on the device, but we don't want to be master.
150 // However, we make sure not to defer for more than
151 // ARBITRATION_TIMEOUT_SECONDS.
152 final boolean timeoutExpired;
153 if (pendingElectionIdTimestamp == 0) {
154 pendingElectionIdTimestamp = currentTimeMillis();
155 timeoutExpired = false;
156 } else {
157 timeoutExpired = (currentTimeMillis() - pendingElectionIdTimestamp)
158 > ARBITRATION_TIMEOUT_SECONDS * 1000;
159 }
160 if (timeoutExpired) {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700161 log.warn("Arbitration timeout expired for {}! " +
162 "Will send pending election ID now...",
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700163 deviceId);
164 }
165 if (!timeoutExpired &&
166 !requestedToBeMaster.get() && masterElectionId != null &&
167 pendingElectionId.compareTo(masterElectionId) > 0) {
168 log.info("Deferring sending master arbitration update for {}, master " +
169 "election ID of server ({}) is smaller than " +
170 "requested one ({}), but we do NOT want to be master...",
171 deviceId, masterElectionId, pendingElectionId);
172 // Will try again as soon as the master election ID store is
173 // updated...
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700174 masterElectionIdStore.setListener(
175 deviceId, p4DeviceId, masterElectionIdListener);
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700176 // ..or in ARBITRATION_RETRY_SECONDS at the latest (if we missed
177 // the store event).
178 pendingElectionIdRetryTask = SharedScheduledExecutors.newTimeout(
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700179 () -> handlePendingElectionId(
180 masterElectionIdStore.get(deviceId, p4DeviceId)),
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700181 ARBITRATION_RETRY_SECONDS, TimeUnit.SECONDS);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800182 } else {
183 // Send now.
pierventre14a76e72022-02-28 20:34:42 -0800184 if (log.isDebugEnabled()) {
185 log.debug("Setting mastership on {}... " +
186 "master={}, newElectionId={}, " +
187 "masterElectionId={}, sessionOpen={}",
188 deviceId, requestedToBeMaster.get(),
189 pendingElectionId, masterElectionId,
190 streamChannelManager.isOpen());
191 } else if (!pendingElectionId.equals(lastUsedElectionId)) {
192 log.info("Setting mastership on {}... " +
193 "master={}, newElectionId={}, " +
194 "masterElectionId={}, sessionOpen={}",
195 deviceId, requestedToBeMaster.get(),
196 pendingElectionId, masterElectionId,
197 streamChannelManager.isOpen());
198 }
Carmelo Cascone4b616312019-04-17 14:15:45 -0700199 // Optimistically set the reported master status, if wrong, it
200 // will be updated by the arbitration response. This alleviates
201 // race conditions when calling isMaster() right after setting
202 // mastership.
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800203 sendMasterArbitrationUpdate(pendingElectionId);
Carmelo Cascone4b616312019-04-17 14:15:45 -0700204 isMaster.set(requestedToBeMaster.get());
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800205 pendingElectionId = null;
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700206 pendingElectionIdTimestamp = 0;
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800207 // No need to listen for master election ID changes.
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700208 masterElectionIdStore.unsetListener(deviceId, p4DeviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800209 }
210 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800211 }
212
213 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700214 public boolean isMaster(long p4DeviceId) {
215 checkArgument(this.p4DeviceId == p4DeviceId);
Carmelo Cascone4b616312019-04-17 14:15:45 -0700216 synchronized (requestedToBeMaster) {
217 return isMaster.get();
218 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800219 }
220
221 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700222 public void packetOut(long p4DeviceId, PiPacketOperation packet, PiPipeconf pipeconf) {
223 checkArgument(this.p4DeviceId == p4DeviceId);
224 if (!isSessionOpen(p4DeviceId)) {
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700225 log.warn("Dropping packet-out request for {}, session is closed",
226 deviceId);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800227 return;
228 }
229 if (log.isTraceEnabled()) {
230 log.trace("Sending packet-out to {}: {}", deviceId, packet);
231 }
232 try {
233 // Encode the PiPacketOperation into a PacketOut
234 final P4RuntimeOuterClass.PacketOut packetOut =
235 CODECS.packetOut().encode(packet, null, pipeconf);
236 // Build the request
237 final StreamMessageRequest packetOutRequest = StreamMessageRequest
238 .newBuilder().setPacket(packetOut).build();
239 // Send.
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800240 streamChannelManager.send(packetOutRequest);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800241 } catch (CodecException e) {
242 log.error("Unable to send packet-out: {}", e.getMessage());
243 }
244 }
245
246 private void sendMasterArbitrationUpdate(BigInteger electionId) {
247 log.debug("Sending arbitration update to {}... electionId={}",
248 deviceId, electionId);
249 final P4RuntimeOuterClass.Uint128 idMsg = bigIntegerToUint128(electionId);
250 streamChannelManager.send(
251 StreamMessageRequest.newBuilder()
252 .setArbitration(
253 P4RuntimeOuterClass.MasterArbitrationUpdate
254 .newBuilder()
255 .setDeviceId(p4DeviceId)
256 .setElectionId(idMsg)
257 .build())
258 .build());
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800259 lastUsedElectionId = electionId;
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800260 }
261
262 private P4RuntimeOuterClass.Uint128 bigIntegerToUint128(BigInteger value) {
263 final byte[] arr = value.toByteArray();
264 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
265 .put(new byte[Long.BYTES * 2 - arr.length])
266 .put(arr);
267 bb.rewind();
268 return P4RuntimeOuterClass.Uint128.newBuilder()
269 .setHigh(bb.getLong())
270 .setLow(bb.getLong())
271 .build();
272 }
273
274 private BigInteger uint128ToBigInteger(P4RuntimeOuterClass.Uint128 value) {
275 return new BigInteger(
276 ByteBuffer.allocate(Long.BYTES * 2)
277 .putLong(value.getHigh())
278 .putLong(value.getLow())
279 .array());
280 }
281
282 private void handlePacketIn(P4RuntimeOuterClass.PacketIn packetInMsg) {
283 if (log.isTraceEnabled()) {
284 log.trace("Received packet-in from {}: {}", deviceId, packetInMsg);
285 }
286 if (!pipeconfService.getPipeconf(deviceId).isPresent()) {
287 log.warn("Unable to handle packet-in from {}, missing pipeconf: {}",
288 deviceId, TextFormat.shortDebugString(packetInMsg));
289 return;
290 }
291 // Decode packet message and post event.
292 // TODO: consider implementing a cache to speed up
293 // encoding/deconding of packet-in/out (e.g. LLDP, ARP)
294 final PiPipeconf pipeconf = pipeconfService.getPipeconf(deviceId).get();
295 final PiPacketOperation pktOperation;
296 try {
297 pktOperation = CODECS.packetIn().decode(
298 packetInMsg, null, pipeconf);
299 } catch (CodecException e) {
300 log.warn("Unable to process packet-int: {}", e.getMessage());
301 return;
302 }
303 controller.postEvent(new P4RuntimeEvent(
304 P4RuntimeEvent.Type.PACKET_IN,
305 new PacketInEvent(deviceId, pktOperation)));
306 }
307
308 private void handleArbitrationUpdate(P4RuntimeOuterClass.MasterArbitrationUpdate msg) {
309 // From the spec...
310 // - Election_id: The stream RPC with the highest election_id is the
311 // master. Switch populates with the highest election ID it
312 // has received from all connected controllers.
313 // - Status: Switch populates this with OK for the client that is the
314 // master, and with an error status for all other connected clients (at
315 // every mastership change).
316 if (!msg.hasElectionId() || !msg.hasStatus()) {
317 return;
318 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800319 // Is this client master?
320 isMaster.set(msg.getStatus().getCode() == Status.OK.getCode().value());
321 // Notify new master election IDs to all nodes via distributed store.
322 // This is required for those nodes who do not have a Stream RPC open,
323 // and that otherwise would not be aware of changes, keeping their
324 // pending mastership operations forever.
325 final BigInteger masterElectionId = uint128ToBigInteger(msg.getElectionId());
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700326 masterElectionIdStore.set(deviceId, p4DeviceId, masterElectionId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800327
328 log.debug("Received arbitration update from {}: isMaster={}, masterElectionId={}",
329 deviceId, isMaster.get(), masterElectionId);
330
331 // Post mastership event via controller.
332 controller.postEvent(new DeviceAgentEvent(
333 isMaster.get() ? DeviceAgentEvent.Type.ROLE_MASTER
334 : DeviceAgentEvent.Type.ROLE_STANDBY, deviceId));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800335 }
336
337 /**
338 * Returns the election ID last used in a MasterArbitrationUpdate message
339 * sent by the client to the server.
340 *
341 * @return election ID uint128 protobuf message
342 */
343 P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800344 return lastUsedElectionId == null
345 ? P4RuntimeOuterClass.Uint128.getDefaultInstance()
346 : bigIntegerToUint128(lastUsedElectionId);
347 }
348
349 /**
350 * Handles updates of the master election ID by applying any pending
351 * mastership operation.
352 */
353 private class InternalMasterElectionIdListener
354 implements MasterElectionIdStore.MasterElectionIdListener {
355
356 @Override
357 public void updated(BigInteger masterElectionId) {
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700358 log.debug("UPDATED master election ID of {}: {}",
359 deviceId, masterElectionId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800360 handlePendingElectionId(masterElectionId);
361 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800362 }
363
364 /**
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700365 * A manager for the P4Runtime StreamChannel RPC that opportunistically creates
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800366 * new stream RCP stubs (e.g. when one fails because of errors) and posts
367 * channel events via the P4Runtime controller.
368 */
369 private final class StreamChannelManager {
370
371 private final AtomicBoolean open = new AtomicBoolean(false);
372 private final StreamObserver<StreamMessageResponse> responseObserver =
373 new InternalStreamResponseObserver(this);
374 private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
375
376 void send(StreamMessageRequest value) {
377 synchronized (this) {
378 initIfRequired();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800379 requestObserver.onNext(value);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800380 // Optimistically set the session as open. In case of errors, it
381 // will be closed by the response stream observer.
382 streamChannelManager.signalOpen();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800383 }
384 }
385
386 private void initIfRequired() {
387 if (requestObserver == null) {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700388 log.debug("Starting new StreamChannel RPC for {}...", deviceId);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800389 open.set(false);
390 client.execRpcNoTimeout(
391 s -> requestObserver =
392 (ClientCallStreamObserver<StreamMessageRequest>)
393 s.streamChannel(responseObserver)
394 );
395 }
396 }
397
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800398 void teardown() {
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800399 synchronized (this) {
400 signalClosed();
401 if (requestObserver != null) {
402 requestObserver.onCompleted();
403 requestObserver.cancel("Completed", null);
404 requestObserver = null;
405 }
406 }
407 }
408
409 void signalOpen() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800410 open.set(true);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800411 }
412
413 void signalClosed() {
414 synchronized (this) {
415 final boolean wasOpen = open.getAndSet(false);
Carmelo Cascone4b616312019-04-17 14:15:45 -0700416 if (wasOpen) {
417 // We lost any valid mastership role.
418 controller.postEvent(new DeviceAgentEvent(
419 DeviceAgentEvent.Type.ROLE_NONE, deviceId));
420 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800421 }
422 }
423
424 boolean isOpen() {
425 return open.get();
426 }
427 }
428
429 /**
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700430 * Handles messages received from the device on the StreamChannel RPC.
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800431 */
432 private final class InternalStreamResponseObserver
433 implements StreamObserver<StreamMessageResponse> {
434
435 private final StreamChannelManager streamChannelManager;
436
437 private InternalStreamResponseObserver(
438 StreamChannelManager streamChannelManager) {
439 this.streamChannelManager = streamChannelManager;
440 }
441
442 @Override
443 public void onNext(StreamMessageResponse message) {
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800444 try {
445 if (log.isTraceEnabled()) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800446 log.trace("Received {} from {}: {}",
447 message.getUpdateCase(), deviceId,
448 TextFormat.shortDebugString(message));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800449 }
450 switch (message.getUpdateCase()) {
451 case PACKET:
452 handlePacketIn(message.getPacket());
453 return;
454 case ARBITRATION:
455 handleArbitrationUpdate(message.getArbitration());
456 return;
Wailok Shum741d50d2021-12-15 23:11:15 +0800457 case ERROR:
458 P4RuntimeOuterClass.StreamError error = message.getError();
459 log.warn("Receive stream error {} from {} Canonical Code: {} Message: {} Space: {} Code: {}",
460 error.getDetailsCase(), deviceId, error.getCanonicalCode(), error.getMessage(),
461 error.getSpace(), error.getCode());
462 return;
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800463 default:
464 log.warn("Unrecognized StreamMessageResponse from {}: {}",
465 deviceId, message.getUpdateCase());
466 }
467 } catch (Throwable ex) {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700468 log.error("Exception while processing StreamMessageResponse from {}",
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800469 deviceId, ex);
470 }
471 }
472
473 @Override
474 public void onError(Throwable throwable) {
475 if (throwable instanceof StatusRuntimeException) {
476 final StatusRuntimeException sre = (StatusRuntimeException) throwable;
477 if (sre.getStatus().getCause() instanceof ConnectException) {
478 log.warn("{} is unreachable ({})",
479 deviceId, sre.getCause().getMessage());
480 } else {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700481 log.warn("Error on StreamChannel RPC for {}: {}",
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800482 deviceId, throwable.getMessage());
483 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800484 log.debug("", throwable);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800485 } else {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700486 log.error(format("Exception on StreamChannel RPC for %s",
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800487 deviceId), throwable);
488 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800489 streamChannelManager.teardown();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800490 }
491
492 @Override
493 public void onCompleted() {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700494 log.warn("StreamChannel RPC for {} has completed", deviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800495 streamChannelManager.teardown();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800496 }
497 }
498}