blob: cf3cda09ecad89bcc2888dd25ee9c4f91b7c9cd6 [file] [log] [blame]
Carmelo Cascone4c289b72019-01-22 15:30:45 -08001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl.client;
18
19import com.google.protobuf.TextFormat;
20import io.grpc.Status;
21import io.grpc.StatusRuntimeException;
22import io.grpc.stub.ClientCallStreamObserver;
23import io.grpc.stub.StreamObserver;
Carmelo Casconeb8a25052019-04-08 15:22:32 -070024import org.onlab.util.SharedScheduledExecutors;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080025import org.onosproject.net.DeviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080026import org.onosproject.net.device.DeviceAgentEvent;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080027import org.onosproject.net.pi.model.PiPipeconf;
28import org.onosproject.net.pi.runtime.PiPacketOperation;
29import org.onosproject.net.pi.service.PiPipeconfService;
30import org.onosproject.p4runtime.api.P4RuntimeEvent;
31import org.onosproject.p4runtime.api.P4RuntimeStreamClient;
32import org.onosproject.p4runtime.ctl.codec.CodecException;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080033import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
34import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore.MasterElectionIdListener;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080035import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
36import org.onosproject.p4runtime.ctl.controller.PacketInEvent;
37import org.slf4j.Logger;
38import p4.v1.P4RuntimeOuterClass;
39import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
40import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
41
42import java.math.BigInteger;
43import java.net.ConnectException;
44import java.nio.ByteBuffer;
Carmelo Casconeb8a25052019-04-08 15:22:32 -070045import java.util.concurrent.ScheduledFuture;
46import java.util.concurrent.TimeUnit;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080047import java.util.concurrent.atomic.AtomicBoolean;
48
Carmelo Cascone3977ea42019-02-28 13:43:42 -080049import static com.google.common.base.Preconditions.checkArgument;
50import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080051import static java.lang.String.format;
Carmelo Casconeb8a25052019-04-08 15:22:32 -070052import static java.lang.System.currentTimeMillis;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080053import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
54import static org.slf4j.LoggerFactory.getLogger;
55
56/**
57 * Implementation of P4RuntimeStreamClient. Handles P4Runtime StreamChannel RPC
Carmelo Casconec2be50a2019-04-10 00:15:39 -070058 * operations, such as arbitration update and packet-in/out, for a given
59 * P4Runtime-internal device ID.
Carmelo Cascone4c289b72019-01-22 15:30:45 -080060 */
61public final class StreamClientImpl implements P4RuntimeStreamClient {
62
63 private static final Logger log = getLogger(StreamClientImpl.class);
64
Carmelo Casconeb8a25052019-04-08 15:22:32 -070065 private static final int ARBITRATION_RETRY_SECONDS = 3;
66 private static final int ARBITRATION_TIMEOUT_SECONDS = 15;
67
Carmelo Cascone4c289b72019-01-22 15:30:45 -080068 private final P4RuntimeClientImpl client;
69 private final DeviceId deviceId;
70 private final long p4DeviceId;
71 private final PiPipeconfService pipeconfService;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080072 private final MasterElectionIdStore masterElectionIdStore;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080073 private final P4RuntimeControllerImpl controller;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080074
Carmelo Cascone4c289b72019-01-22 15:30:45 -080075 private final StreamChannelManager streamChannelManager = new StreamChannelManager();
Carmelo Cascone3977ea42019-02-28 13:43:42 -080076 private final MasterElectionIdListener masterElectionIdListener = new InternalMasterElectionIdListener();
Carmelo Cascone4c289b72019-01-22 15:30:45 -080077
Carmelo Cascone3977ea42019-02-28 13:43:42 -080078 private final AtomicBoolean isMaster = new AtomicBoolean(false);
79 private final AtomicBoolean requestedToBeMaster = new AtomicBoolean(false);
Carmelo Cascone4c289b72019-01-22 15:30:45 -080080
Carmelo Cascone3977ea42019-02-28 13:43:42 -080081 private BigInteger pendingElectionId = null;
82 private BigInteger lastUsedElectionId = null;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080083
Carmelo Casconeb8a25052019-04-08 15:22:32 -070084 private ScheduledFuture<?> pendingElectionIdRetryTask = null;
85 private long pendingElectionIdTimestamp = 0;
86
Carmelo Cascone4c289b72019-01-22 15:30:45 -080087 StreamClientImpl(
88 PiPipeconfService pipeconfService,
Carmelo Cascone3977ea42019-02-28 13:43:42 -080089 MasterElectionIdStore masterElectionIdStore,
Carmelo Cascone4c289b72019-01-22 15:30:45 -080090 P4RuntimeClientImpl client,
Carmelo Casconec2be50a2019-04-10 00:15:39 -070091 long p4DeviceId,
Carmelo Cascone4c289b72019-01-22 15:30:45 -080092 P4RuntimeControllerImpl controller) {
93 this.client = client;
94 this.deviceId = client.deviceId();
Carmelo Casconec2be50a2019-04-10 00:15:39 -070095 this.p4DeviceId = p4DeviceId;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080096 this.pipeconfService = pipeconfService;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080097 this.masterElectionIdStore = masterElectionIdStore;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080098 this.controller = controller;
99 }
100
101 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700102 public boolean isSessionOpen(long p4DeviceId) {
103 checkArgument(this.p4DeviceId == p4DeviceId);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800104 return streamChannelManager.isOpen();
105 }
106
107 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700108 public void closeSession(long p4DeviceId) {
109 checkArgument(this.p4DeviceId == p4DeviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800110 synchronized (requestedToBeMaster) {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700111 this.masterElectionIdStore.unsetListener(deviceId, p4DeviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800112 streamChannelManager.teardown();
113 pendingElectionId = null;
114 requestedToBeMaster.set(false);
115 isMaster.set(false);
116 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800117 }
118
119 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700120 public void setMastership(long p4DeviceId, boolean master,
121 BigInteger newElectionId) {
122 checkArgument(this.p4DeviceId == p4DeviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800123 checkNotNull(newElectionId);
124 checkArgument(newElectionId.compareTo(BigInteger.ZERO) > 0,
125 "newElectionId must be a non zero positive number");
126 synchronized (requestedToBeMaster) {
127 requestedToBeMaster.set(master);
128 pendingElectionId = newElectionId;
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700129 handlePendingElectionId(masterElectionIdStore.get(
130 deviceId, p4DeviceId));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800131 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800132 }
133
134 private void handlePendingElectionId(BigInteger masterElectionId) {
135 synchronized (requestedToBeMaster) {
136 if (pendingElectionId == null) {
137 // No pending requests.
138 return;
139 }
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700140 // Cancel any pending task. We'll reschedule if needed.
141 if (pendingElectionIdRetryTask != null) {
142 // Do not interrupt if running, as this might be executed by the
143 // pending task itself.
144 pendingElectionIdRetryTask.cancel(false);
145 pendingElectionIdRetryTask = null;
146 }
147 // We implement a deferring mechanism to avoid becoming master when
148 // we shouldn't, i.e. when the requested election ID is bigger than
149 // the master one on the device, but we don't want to be master.
150 // However, we make sure not to defer for more than
151 // ARBITRATION_TIMEOUT_SECONDS.
152 final boolean timeoutExpired;
153 if (pendingElectionIdTimestamp == 0) {
154 pendingElectionIdTimestamp = currentTimeMillis();
155 timeoutExpired = false;
156 } else {
157 timeoutExpired = (currentTimeMillis() - pendingElectionIdTimestamp)
158 > ARBITRATION_TIMEOUT_SECONDS * 1000;
159 }
160 if (timeoutExpired) {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700161 log.warn("Arbitration timeout expired for {}! " +
162 "Will send pending election ID now...",
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700163 deviceId);
164 }
165 if (!timeoutExpired &&
166 !requestedToBeMaster.get() && masterElectionId != null &&
167 pendingElectionId.compareTo(masterElectionId) > 0) {
168 log.info("Deferring sending master arbitration update for {}, master " +
169 "election ID of server ({}) is smaller than " +
170 "requested one ({}), but we do NOT want to be master...",
171 deviceId, masterElectionId, pendingElectionId);
172 // Will try again as soon as the master election ID store is
173 // updated...
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700174 masterElectionIdStore.setListener(
175 deviceId, p4DeviceId, masterElectionIdListener);
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700176 // ..or in ARBITRATION_RETRY_SECONDS at the latest (if we missed
177 // the store event).
178 pendingElectionIdRetryTask = SharedScheduledExecutors.newTimeout(
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700179 () -> handlePendingElectionId(
180 masterElectionIdStore.get(deviceId, p4DeviceId)),
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700181 ARBITRATION_RETRY_SECONDS, TimeUnit.SECONDS);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800182 } else {
183 // Send now.
184 log.info("Setting mastership on {}... " +
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700185 "master={}, newElectionId={}, " +
186 "masterElectionId={}, sessionOpen={}",
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700187 deviceId, requestedToBeMaster.get(),
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700188 pendingElectionId, masterElectionId,
189 streamChannelManager.isOpen());
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800190 sendMasterArbitrationUpdate(pendingElectionId);
191 pendingElectionId = null;
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700192 pendingElectionIdTimestamp = 0;
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800193 // No need to listen for master election ID changes.
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700194 masterElectionIdStore.unsetListener(deviceId, p4DeviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800195 }
196 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800197 }
198
199 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700200 public boolean isMaster(long p4DeviceId) {
201 checkArgument(this.p4DeviceId == p4DeviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800202 return isMaster.get();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800203 }
204
205 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700206 public void packetOut(long p4DeviceId, PiPacketOperation packet, PiPipeconf pipeconf) {
207 checkArgument(this.p4DeviceId == p4DeviceId);
208 if (!isSessionOpen(p4DeviceId)) {
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700209 log.warn("Dropping packet-out request for {}, session is closed",
210 deviceId);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800211 return;
212 }
213 if (log.isTraceEnabled()) {
214 log.trace("Sending packet-out to {}: {}", deviceId, packet);
215 }
216 try {
217 // Encode the PiPacketOperation into a PacketOut
218 final P4RuntimeOuterClass.PacketOut packetOut =
219 CODECS.packetOut().encode(packet, null, pipeconf);
220 // Build the request
221 final StreamMessageRequest packetOutRequest = StreamMessageRequest
222 .newBuilder().setPacket(packetOut).build();
223 // Send.
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800224 streamChannelManager.send(packetOutRequest);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800225 } catch (CodecException e) {
226 log.error("Unable to send packet-out: {}", e.getMessage());
227 }
228 }
229
230 private void sendMasterArbitrationUpdate(BigInteger electionId) {
231 log.debug("Sending arbitration update to {}... electionId={}",
232 deviceId, electionId);
233 final P4RuntimeOuterClass.Uint128 idMsg = bigIntegerToUint128(electionId);
234 streamChannelManager.send(
235 StreamMessageRequest.newBuilder()
236 .setArbitration(
237 P4RuntimeOuterClass.MasterArbitrationUpdate
238 .newBuilder()
239 .setDeviceId(p4DeviceId)
240 .setElectionId(idMsg)
241 .build())
242 .build());
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800243 lastUsedElectionId = electionId;
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800244 }
245
246 private P4RuntimeOuterClass.Uint128 bigIntegerToUint128(BigInteger value) {
247 final byte[] arr = value.toByteArray();
248 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
249 .put(new byte[Long.BYTES * 2 - arr.length])
250 .put(arr);
251 bb.rewind();
252 return P4RuntimeOuterClass.Uint128.newBuilder()
253 .setHigh(bb.getLong())
254 .setLow(bb.getLong())
255 .build();
256 }
257
258 private BigInteger uint128ToBigInteger(P4RuntimeOuterClass.Uint128 value) {
259 return new BigInteger(
260 ByteBuffer.allocate(Long.BYTES * 2)
261 .putLong(value.getHigh())
262 .putLong(value.getLow())
263 .array());
264 }
265
266 private void handlePacketIn(P4RuntimeOuterClass.PacketIn packetInMsg) {
267 if (log.isTraceEnabled()) {
268 log.trace("Received packet-in from {}: {}", deviceId, packetInMsg);
269 }
270 if (!pipeconfService.getPipeconf(deviceId).isPresent()) {
271 log.warn("Unable to handle packet-in from {}, missing pipeconf: {}",
272 deviceId, TextFormat.shortDebugString(packetInMsg));
273 return;
274 }
275 // Decode packet message and post event.
276 // TODO: consider implementing a cache to speed up
277 // encoding/deconding of packet-in/out (e.g. LLDP, ARP)
278 final PiPipeconf pipeconf = pipeconfService.getPipeconf(deviceId).get();
279 final PiPacketOperation pktOperation;
280 try {
281 pktOperation = CODECS.packetIn().decode(
282 packetInMsg, null, pipeconf);
283 } catch (CodecException e) {
284 log.warn("Unable to process packet-int: {}", e.getMessage());
285 return;
286 }
287 controller.postEvent(new P4RuntimeEvent(
288 P4RuntimeEvent.Type.PACKET_IN,
289 new PacketInEvent(deviceId, pktOperation)));
290 }
291
292 private void handleArbitrationUpdate(P4RuntimeOuterClass.MasterArbitrationUpdate msg) {
293 // From the spec...
294 // - Election_id: The stream RPC with the highest election_id is the
295 // master. Switch populates with the highest election ID it
296 // has received from all connected controllers.
297 // - Status: Switch populates this with OK for the client that is the
298 // master, and with an error status for all other connected clients (at
299 // every mastership change).
300 if (!msg.hasElectionId() || !msg.hasStatus()) {
301 return;
302 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800303 // Is this client master?
304 isMaster.set(msg.getStatus().getCode() == Status.OK.getCode().value());
305 // Notify new master election IDs to all nodes via distributed store.
306 // This is required for those nodes who do not have a Stream RPC open,
307 // and that otherwise would not be aware of changes, keeping their
308 // pending mastership operations forever.
309 final BigInteger masterElectionId = uint128ToBigInteger(msg.getElectionId());
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700310 masterElectionIdStore.set(deviceId, p4DeviceId, masterElectionId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800311
312 log.debug("Received arbitration update from {}: isMaster={}, masterElectionId={}",
313 deviceId, isMaster.get(), masterElectionId);
314
315 // Post mastership event via controller.
316 controller.postEvent(new DeviceAgentEvent(
317 isMaster.get() ? DeviceAgentEvent.Type.ROLE_MASTER
318 : DeviceAgentEvent.Type.ROLE_STANDBY, deviceId));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800319 }
320
321 /**
322 * Returns the election ID last used in a MasterArbitrationUpdate message
323 * sent by the client to the server.
324 *
325 * @return election ID uint128 protobuf message
326 */
327 P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800328 return lastUsedElectionId == null
329 ? P4RuntimeOuterClass.Uint128.getDefaultInstance()
330 : bigIntegerToUint128(lastUsedElectionId);
331 }
332
333 /**
334 * Handles updates of the master election ID by applying any pending
335 * mastership operation.
336 */
337 private class InternalMasterElectionIdListener
338 implements MasterElectionIdStore.MasterElectionIdListener {
339
340 @Override
341 public void updated(BigInteger masterElectionId) {
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700342 log.debug("UPDATED master election ID of {}: {}",
343 deviceId, masterElectionId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800344 handlePendingElectionId(masterElectionId);
345 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800346 }
347
348 /**
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700349 * A manager for the P4Runtime StreamChannel RPC that opportunistically creates
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800350 * new stream RCP stubs (e.g. when one fails because of errors) and posts
351 * channel events via the P4Runtime controller.
352 */
353 private final class StreamChannelManager {
354
355 private final AtomicBoolean open = new AtomicBoolean(false);
356 private final StreamObserver<StreamMessageResponse> responseObserver =
357 new InternalStreamResponseObserver(this);
358 private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
359
360 void send(StreamMessageRequest value) {
361 synchronized (this) {
362 initIfRequired();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800363 requestObserver.onNext(value);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800364 // Optimistically set the session as open. In case of errors, it
365 // will be closed by the response stream observer.
366 streamChannelManager.signalOpen();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800367 }
368 }
369
370 private void initIfRequired() {
371 if (requestObserver == null) {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700372 log.debug("Starting new StreamChannel RPC for {}...", deviceId);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800373 open.set(false);
374 client.execRpcNoTimeout(
375 s -> requestObserver =
376 (ClientCallStreamObserver<StreamMessageRequest>)
377 s.streamChannel(responseObserver)
378 );
379 }
380 }
381
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800382 void teardown() {
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800383 synchronized (this) {
384 signalClosed();
385 if (requestObserver != null) {
386 requestObserver.onCompleted();
387 requestObserver.cancel("Completed", null);
388 requestObserver = null;
389 }
390 }
391 }
392
393 void signalOpen() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800394 open.set(true);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800395 }
396
397 void signalClosed() {
398 synchronized (this) {
399 final boolean wasOpen = open.getAndSet(false);
Carmelo Casconed51a5552019-04-13 01:22:25 -0700400 // FIXME: in case of device disconnection, all clients will
401 // signal role NONE, preventing the DeviceManager to mark the
402 // device as offline, as only the master can do that. We should
403 // change the DeviceManager. For now, we disable signaling role
404 // NONE.
405 // if (wasOpen) {
406 // // We lost any valid mastership role.
407 // controller.postEvent(new DeviceAgentEvent(
408 // DeviceAgentEvent.Type.ROLE_NONE, deviceId));
409 // }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800410 }
411 }
412
413 boolean isOpen() {
414 return open.get();
415 }
416 }
417
418 /**
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700419 * Handles messages received from the device on the StreamChannel RPC.
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800420 */
421 private final class InternalStreamResponseObserver
422 implements StreamObserver<StreamMessageResponse> {
423
424 private final StreamChannelManager streamChannelManager;
425
426 private InternalStreamResponseObserver(
427 StreamChannelManager streamChannelManager) {
428 this.streamChannelManager = streamChannelManager;
429 }
430
431 @Override
432 public void onNext(StreamMessageResponse message) {
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800433 try {
434 if (log.isTraceEnabled()) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800435 log.trace("Received {} from {}: {}",
436 message.getUpdateCase(), deviceId,
437 TextFormat.shortDebugString(message));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800438 }
439 switch (message.getUpdateCase()) {
440 case PACKET:
441 handlePacketIn(message.getPacket());
442 return;
443 case ARBITRATION:
444 handleArbitrationUpdate(message.getArbitration());
445 return;
446 default:
447 log.warn("Unrecognized StreamMessageResponse from {}: {}",
448 deviceId, message.getUpdateCase());
449 }
450 } catch (Throwable ex) {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700451 log.error("Exception while processing StreamMessageResponse from {}",
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800452 deviceId, ex);
453 }
454 }
455
456 @Override
457 public void onError(Throwable throwable) {
458 if (throwable instanceof StatusRuntimeException) {
459 final StatusRuntimeException sre = (StatusRuntimeException) throwable;
460 if (sre.getStatus().getCause() instanceof ConnectException) {
461 log.warn("{} is unreachable ({})",
462 deviceId, sre.getCause().getMessage());
463 } else {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700464 log.warn("Error on StreamChannel RPC for {}: {}",
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800465 deviceId, throwable.getMessage());
466 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800467 log.debug("", throwable);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800468 } else {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700469 log.error(format("Exception on StreamChannel RPC for %s",
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800470 deviceId), throwable);
471 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800472 streamChannelManager.teardown();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800473 }
474
475 @Override
476 public void onCompleted() {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700477 log.warn("StreamChannel RPC for {} has completed", deviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800478 streamChannelManager.teardown();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800479 }
480 }
481}