blob: e805e914f9b61558f7369c8e368fe2824b9b481e [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone8d99b172017-07-18 17:26:31 -040019import com.google.common.collect.ImmutableMap;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040020import com.google.protobuf.ByteString;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040021import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040022import io.grpc.ManagedChannel;
23import io.grpc.Status;
24import io.grpc.StatusRuntimeException;
25import io.grpc.stub.StreamObserver;
Andrea Campanella288b2732017-07-28 14:16:16 +020026import org.onlab.osgi.DefaultServiceDirectory;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040027import org.onosproject.net.DeviceId;
Andrea Campanella432f7182017-07-14 18:43:27 +020028import org.onosproject.net.pi.model.PiPipeconf;
29import org.onosproject.net.pi.runtime.PiPacketOperation;
Andrea Campanella288b2732017-07-28 14:16:16 +020030import org.onosproject.net.pi.runtime.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040031import org.onosproject.net.pi.runtime.PiTableEntry;
32import org.onosproject.net.pi.runtime.PiTableId;
33import org.onosproject.p4runtime.api.P4RuntimeClient;
34import org.onosproject.p4runtime.api.P4RuntimeEvent;
35import org.slf4j.Logger;
36import p4.P4RuntimeGrpc;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040037import p4.P4RuntimeOuterClass.Entity;
38import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
39import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
40import p4.P4RuntimeOuterClass.PacketIn;
41import p4.P4RuntimeOuterClass.ReadRequest;
42import p4.P4RuntimeOuterClass.ReadResponse;
43import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
44import p4.P4RuntimeOuterClass.StreamMessageRequest;
45import p4.P4RuntimeOuterClass.StreamMessageResponse;
46import p4.P4RuntimeOuterClass.TableEntry;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040047import p4.P4RuntimeOuterClass.Update;
48import p4.P4RuntimeOuterClass.WriteRequest;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020049import p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040050import p4.tmp.P4Config;
51
52import java.io.IOException;
53import java.io.InputStream;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040054import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040055import java.util.Collections;
56import java.util.Iterator;
57import java.util.List;
58import java.util.Map;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040059import java.util.concurrent.CompletableFuture;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040060import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040061import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040062import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040063import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040064import java.util.concurrent.locks.Lock;
65import java.util.concurrent.locks.ReentrantLock;
66import java.util.function.Supplier;
67import java.util.stream.Collectors;
68import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040069
Carmelo Cascone8d99b172017-07-18 17:26:31 -040070import static org.onlab.util.Tools.groupedThreads;
71import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040072import static org.slf4j.LoggerFactory.getLogger;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040073import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020074import static p4.P4RuntimeOuterClass.PacketOut;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040075import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
76
77/**
78 * Implementation of a P4Runtime client.
79 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -040080public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040081
82 private static final int DEADLINE_SECONDS = 15;
83
Carmelo Cascone8d99b172017-07-18 17:26:31 -040084 // FIXME: use static election ID, since mastership arbitration is not yet support on BMv2 or Tofino.
85 private static final int ELECTION_ID = 1;
86
87 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
88 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
89 WriteOperationType.INSERT, Update.Type.INSERT,
90 WriteOperationType.MODIFY, Update.Type.MODIFY,
91 WriteOperationType.DELETE, Update.Type.DELETE
92 );
93
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040094 private final Logger log = getLogger(getClass());
95
96 private final DeviceId deviceId;
97 private final int p4DeviceId;
98 private final P4RuntimeControllerImpl controller;
99 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400100 private final Context.CancellableContext cancellableContext;
101 private final ExecutorService executorService;
102 private final Executor contextExecutor;
103 private final Lock writeLock = new ReentrantLock();
104 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400105
106
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400107 P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400108 this.deviceId = deviceId;
109 this.p4DeviceId = p4DeviceId;
110 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400111 this.cancellableContext = Context.current().withCancellation();
Carmelo Casconea966c342017-07-30 01:56:30 -0400112 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400113 "onos/p4runtime-client-" + deviceId.toString(),
114 deviceId.toString() + "-%d"));
115 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200116 //TODO Investigate deadline or timeout in supplyInContext Method
117 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400118 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
119 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
120 }
121
122 /**
123 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
124 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
125 * <p>
126 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
127 * <p>
128 */
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200129 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400130 return CompletableFuture.supplyAsync(() -> {
131 // TODO: explore a more relaxed locking strategy.
132 writeLock.lock();
133 try {
134 return supplier.get();
Carmelo Casconea966c342017-07-30 01:56:30 -0400135 } catch (Throwable ex) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200136 log.error("Exception in P4Runtime client of {}, executing {}", deviceId, opDescription, ex);
Carmelo Casconea966c342017-07-30 01:56:30 -0400137 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400138 } finally {
139 writeLock.unlock();
140 }
141 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400142 }
143
144 @Override
145 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200146 return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400147 }
148
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400149 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400150 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200151 return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400152 }
153
154 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400155 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
156 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200157 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
158 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400159 }
160
161 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400162 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200163 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400164 }
165
166 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200167 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200168 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200169 }
170
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400171 /* Blocking method implementations below */
172
173 private boolean doInitStreamChannel() {
174 // To listen for packets and other events, we need to start the RPC.
175 // Here we do it by sending a master arbitration update.
176 log.info("initializing stream chanel on {}...", deviceId);
177 if (!doArbitrationUpdate()) {
178 log.warn("Unable to initialize stream channel for {}", deviceId);
179 return false;
180 } else {
181 return true;
182 }
183 }
184
185 private boolean doArbitrationUpdate() {
186 log.info("Sending arbitration update to {}...", deviceId);
187 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
188 .setArbitration(MasterArbitrationUpdate.newBuilder()
189 .setDeviceId(p4DeviceId)
190 .build())
191 .build();
192 try {
193 streamRequestObserver.onNext(requestMsg);
194 return true;
195 } catch (StatusRuntimeException e) {
196 log.warn("Arbitration update failed for {}: {}", deviceId, e);
197 return false;
198 }
199 }
200
201 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
202
203 log.info("Setting pipeline config for {} to {} using {}...", deviceId, pipeconf.id(), targetConfigExtType);
204
205 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
206 if (p4Info == null) {
207 // Problem logged by PipeconfHelper.
208 return false;
209 }
210
211 if (!pipeconf.extension(targetConfigExtType).isPresent()) {
212 log.warn("Missing extension {} in pipeconf {}", targetConfigExtType, pipeconf.id());
213 return false;
214 }
215
216 InputStream targetConfig = pipeconf.extension(targetConfigExtType).get();
217 P4Config.P4DeviceConfig p4DeviceConfigMsg;
218 try {
219 p4DeviceConfigMsg = P4Config.P4DeviceConfig
220 .newBuilder()
221 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
222 .setReassign(true)
223 .setDeviceData(ByteString.readFrom(targetConfig))
224 .build();
225 } catch (IOException ex) {
226 log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
227 return false;
228 }
229
230 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
231 .newBuilder()
232 .setAction(VERIFY_AND_COMMIT)
233 .addConfigs(ForwardingPipelineConfig
234 .newBuilder()
235 .setDeviceId(p4DeviceId)
236 .setP4Info(p4Info)
237 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
238 .build())
239 .build();
240
241 try {
242 this.blockingStub.setForwardingPipelineConfig(request);
243
244 } catch (StatusRuntimeException ex) {
245 log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
246 return false;
247 }
248
249 return true;
250 }
251
252 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
253 PiPipeconf pipeconf) {
254
255 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
256
257 Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
258 .stream()
259 .map(tableEntryMsg ->
260 Update.newBuilder()
261 .setEntity(Entity.newBuilder()
262 .setTableEntry(tableEntryMsg)
263 .build())
264 .setType(UPDATE_TYPES.get(opType))
265 .build())
266 .collect(Collectors.toList());
267
268 if (updateMsgs.size() == 0) {
269 return true;
270 }
271
272 writeRequestBuilder
273 .setDeviceId(p4DeviceId)
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200274 /* PI ignores this ElectionId, commenting out for now.
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400275 .setElectionId(Uint128.newBuilder()
276 .setHigh(0)
277 .setLow(ELECTION_ID)
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200278 .build()) */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400279 .addAllUpdates(updateMsgs)
280 .build();
281
282 try {
283 blockingStub.write(writeRequestBuilder.build());
284 return true;
285 } catch (StatusRuntimeException e) {
286 log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
287 return false;
288 }
289 }
290
291 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
292
293 log.info("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
294
295 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
296 int tableId;
297 try {
298 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
299 } catch (P4InfoBrowser.NotFoundException e) {
300 log.warn("Unable to dump table: {}", e.getMessage());
301 return Collections.emptyList();
302 }
303
304 ReadRequest requestMsg = ReadRequest.newBuilder()
305 .setDeviceId(p4DeviceId)
306 .addEntities(Entity.newBuilder()
307 .setTableEntry(TableEntry.newBuilder()
308 .setTableId(tableId)
309 .build())
310 .build())
311 .build();
312
313 Iterator<ReadResponse> responses;
314 try {
315 responses = blockingStub.read(requestMsg);
316 } catch (StatusRuntimeException e) {
317 log.warn("Unable to dump table: {}", e.getMessage());
318 return Collections.emptyList();
319 }
320
321 Iterable<ReadResponse> responseIterable = () -> responses;
322 List<TableEntry> tableEntryMsgs = StreamSupport
323 .stream(responseIterable.spliterator(), false)
324 .map(ReadResponse::getEntitiesList)
325 .flatMap(List::stream)
326 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
327 .map(Entity::getTableEntry)
328 .collect(Collectors.toList());
329
330 log.info("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
331
332 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
333 }
334
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200335 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
336 try {
337 //encode the PiPacketOperation into a PacketOut
338 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
339
340 //Build the request
341 StreamMessageRequest packetOutRequest = StreamMessageRequest
342 .newBuilder().setPacket(packetOut).build();
343
344 //Send the request
345 streamRequestObserver.onNext(packetOutRequest);
346
347 } catch (P4InfoBrowser.NotFoundException e) {
348 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
349 log.debug("Exception", e);
350 return false;
351 }
352 return true;
353 }
354
Carmelo Casconea966c342017-07-30 01:56:30 -0400355 private void doPacketIn(PacketIn packetInMsg) {
356
357 // Retrieve the pipeconf for this client's device.
358 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
359 if (pipeconfService == null) {
360 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
361 }
362 final PiPipeconf pipeconf;
363 if (pipeconfService.ofDevice(deviceId).isPresent() &&
364 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
365 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
366 } else {
367 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
368 return;
369 }
370 // Decode packet message and post event.
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200371 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf);
372 DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
373 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400374 log.debug("Received packet in: {}", event);
375 controller.postEvent(event);
376 }
377
378 private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
379
380 log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
381 }
382
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400383 /**
384 * Returns the internal P4 device ID associated with this client.
385 *
386 * @return P4 device ID
387 */
388 public int p4DeviceId() {
389 return p4DeviceId;
390 }
391
392 /**
393 * For testing purpose only. TODO: remove before release.
394 *
395 * @return blocking stub
396 */
397 public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
398 return this.blockingStub;
399 }
400
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200401
Andrea Campanella432f7182017-07-14 18:43:27 +0200402 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400403 public void shutdown() {
404
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400405 log.info("Shutting down client for {}...", deviceId);
406
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400407 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400408 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400409 if (streamRequestObserver != null) {
410 streamRequestObserver.onCompleted();
411 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
412 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400413
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400414 this.executorService.shutdown();
415 try {
416 executorService.awaitTermination(5, TimeUnit.SECONDS);
417 } catch (InterruptedException e) {
418 log.warn("Executor service didn't shutdown in time.");
419 }
420 } finally {
421 writeLock.unlock();
422 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400423 }
424
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400425 /**
426 * Handles messages received from the device on the stream channel.
427 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400428 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
429
430 @Override
431 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400432 executorService.submit(() -> doNext(message));
433 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400434
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400435 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -0400436 try {
437 log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
438 switch (message.getUpdateCase()) {
439 case PACKET:
440 // Packet-in
441 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +0200442 return;
Carmelo Casconea966c342017-07-30 01:56:30 -0400443 case ARBITRATION:
444 doArbitrationUpdateFromDevice(message.getArbitration());
445 return;
446 default:
447 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
448 }
449 } catch (Throwable ex) {
450 log.error("Exception while processing stream channel message from {}", deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400451 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400452 }
453
454 @Override
455 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400456 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
457 // FIXME: we might want to recreate the channel.
458 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
459 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400460 }
461
462 @Override
463 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400464 log.warn("Stream channel for {} has completed", deviceId);
465 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400466 }
467 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400468}