blob: 738417e471f501cccd8afd17dfd0533961471cdb [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020022import com.google.common.collect.Maps;
Yi Tseng82512da2017-08-16 19:46:36 -070023import com.google.common.collect.Multimap;
Carmelo Cascone7f75be42017-09-07 14:37:02 +020024import com.google.common.collect.Sets;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040025import com.google.protobuf.ByteString;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040026import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040027import io.grpc.ManagedChannel;
28import io.grpc.Status;
29import io.grpc.StatusRuntimeException;
30import io.grpc.stub.StreamObserver;
Andrea Campanella288b2732017-07-28 14:16:16 +020031import org.onlab.osgi.DefaultServiceDirectory;
Yi Tseng82512da2017-08-16 19:46:36 -070032import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040033import org.onosproject.net.DeviceId;
Yi Tseng3e7f1452017-10-20 10:31:53 -070034import org.onosproject.net.MastershipRole;
Andrea Campanella432f7182017-07-14 18:43:27 +020035import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020036import org.onosproject.net.pi.runtime.PiActionGroup;
37import org.onosproject.net.pi.runtime.PiActionGroupMember;
Yi Tseng82512da2017-08-16 19:46:36 -070038import org.onosproject.net.pi.runtime.PiActionProfileId;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020039import org.onosproject.net.pi.runtime.PiCounterCellData;
40import org.onosproject.net.pi.runtime.PiCounterCellId;
41import org.onosproject.net.pi.runtime.PiCounterId;
Carmelo Cascone7f75be42017-09-07 14:37:02 +020042import org.onosproject.net.pi.runtime.PiDirectCounterCellId;
43import org.onosproject.net.pi.runtime.PiIndirectCounterCellId;
Andrea Campanella432f7182017-07-14 18:43:27 +020044import org.onosproject.net.pi.runtime.PiPacketOperation;
Andrea Campanella288b2732017-07-28 14:16:16 +020045import org.onosproject.net.pi.runtime.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040046import org.onosproject.net.pi.runtime.PiTableEntry;
47import org.onosproject.net.pi.runtime.PiTableId;
48import org.onosproject.p4runtime.api.P4RuntimeClient;
49import org.onosproject.p4runtime.api.P4RuntimeEvent;
50import org.slf4j.Logger;
51import p4.P4RuntimeGrpc;
Yi Tseng82512da2017-08-16 19:46:36 -070052import p4.P4RuntimeOuterClass.ActionProfileGroup;
53import p4.P4RuntimeOuterClass.ActionProfileMember;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040054import p4.P4RuntimeOuterClass.Entity;
55import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
56import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
57import p4.P4RuntimeOuterClass.PacketIn;
58import p4.P4RuntimeOuterClass.ReadRequest;
59import p4.P4RuntimeOuterClass.ReadResponse;
60import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
61import p4.P4RuntimeOuterClass.StreamMessageRequest;
62import p4.P4RuntimeOuterClass.StreamMessageResponse;
63import p4.P4RuntimeOuterClass.TableEntry;
Yi Tseng3e7f1452017-10-20 10:31:53 -070064import p4.P4RuntimeOuterClass.Uint128;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040065import p4.P4RuntimeOuterClass.Update;
66import p4.P4RuntimeOuterClass.WriteRequest;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020067import p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040068import p4.tmp.P4Config;
69
Carmelo Casconed61fdb32017-10-30 10:09:57 -070070import java.nio.ByteBuffer;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040071import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040072import java.util.Collections;
73import java.util.Iterator;
74import java.util.List;
75import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020076import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020077import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040078import java.util.concurrent.CompletableFuture;
Yi Tseng3e7f1452017-10-20 10:31:53 -070079import java.util.concurrent.ExecutionException;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040080import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040081import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040082import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040083import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040084import java.util.concurrent.locks.Lock;
85import java.util.concurrent.locks.ReentrantLock;
86import java.util.function.Supplier;
87import java.util.stream.Collectors;
88import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040089
Carmelo Casconed61fdb32017-10-30 10:09:57 -070090import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040091import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040092import static org.slf4j.LoggerFactory.getLogger;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070093import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
94import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
95import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020096import static p4.P4RuntimeOuterClass.PacketOut;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040097import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
98
99/**
100 * Implementation of a P4Runtime client.
101 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400102public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400103
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400104 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
105 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
106 WriteOperationType.INSERT, Update.Type.INSERT,
107 WriteOperationType.MODIFY, Update.Type.MODIFY,
108 WriteOperationType.DELETE, Update.Type.DELETE
109 );
Yi Tseng3e7f1452017-10-20 10:31:53 -0700110 private static final String ARBITRATION_RESULT_MASTER = "Is master";
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400111
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400112 private final Logger log = getLogger(getClass());
113
114 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200115 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400116 private final P4RuntimeControllerImpl controller;
117 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400118 private final Context.CancellableContext cancellableContext;
119 private final ExecutorService executorService;
120 private final Executor contextExecutor;
121 private final Lock writeLock = new ReentrantLock();
122 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400123
Yi Tseng3e7f1452017-10-20 10:31:53 -0700124 private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
125 protected Uint128 p4RuntimeElectionId;
126
Yi Tseng82512da2017-08-16 19:46:36 -0700127 /**
128 * Default constructor.
129 *
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200130 * @param deviceId the ONOS device id
Yi Tseng82512da2017-08-16 19:46:36 -0700131 * @param p4DeviceId the P4 device id
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200132 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700133 * @param controller runtime client controller
134 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200135 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
136 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400137 this.deviceId = deviceId;
138 this.p4DeviceId = p4DeviceId;
139 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400140 this.cancellableContext = Context.current().withCancellation();
Carmelo Casconea966c342017-07-30 01:56:30 -0400141 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400142 "onos/p4runtime-client-" + deviceId.toString(),
143 deviceId.toString() + "-%d"));
144 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200145 //TODO Investigate deadline or timeout in supplyInContext Method
146 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400147 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
148 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
149 }
150
151 /**
152 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
153 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
154 * <p>
155 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
156 * <p>
157 */
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200158 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400159 return CompletableFuture.supplyAsync(() -> {
160 // TODO: explore a more relaxed locking strategy.
161 writeLock.lock();
162 try {
163 return supplier.get();
Carmelo Casconea966c342017-07-30 01:56:30 -0400164 } catch (Throwable ex) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200165 if (ex instanceof StatusRuntimeException) {
166 log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
167 } else {
168 log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
169 }
Carmelo Casconea966c342017-07-30 01:56:30 -0400170 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400171 } finally {
172 writeLock.unlock();
173 }
174 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400175 }
176
177 @Override
178 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200179 return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400180 }
181
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400182 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700183 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
184 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400185 }
186
187 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400188 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
189 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200190 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
191 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400192 }
193
194 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400195 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200196 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400197 }
198
199 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200200 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200201 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200202 }
203
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200204 @Override
205 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
206 PiPipeconf pipeconf) {
207 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
208 "readCounterCells-" + cellIds.hashCode());
209 }
210
211 @Override
212 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
213 PiPipeconf pipeconf) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200214
215 /*
216 From p4runtime.proto, the scope of a ReadRequest is defined as follows:
217 CounterEntry:
218 - All counter cells for all meters if counter_id = 0 (default).
219 - All counter cells for given counter_id if index = 0 (default).
220 DirectCounterEntry:
221 - All counter cells for all meters if counter_id = 0 (default).
222 - All counter cells for given counter_id if table_entry.match is empty.
223 */
224
225 Set<PiCounterCellId> cellIds = Sets.newHashSet();
226
227 for (PiCounterId counterId : counterIds) {
228 switch (counterId.type()) {
229 case INDIRECT:
230 cellIds.add(PiIndirectCounterCellId.of(counterId, 0));
231 break;
232 case DIRECT:
233 cellIds.add(PiDirectCounterCellId.of(counterId, PiTableEntry.EMTPY));
234 break;
235 default:
236 log.warn("Unrecognized PI counter ID '{}'", counterId.type());
237 }
238 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200239
240 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
241 "readAllCounterCells-" + cellIds.hashCode());
242 }
243
Yi Tseng82512da2017-08-16 19:46:36 -0700244 @Override
245 public CompletableFuture<Boolean> writeActionGroupMembers(PiActionGroup group,
Yi Tseng82512da2017-08-16 19:46:36 -0700246 WriteOperationType opType,
247 PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200248 return supplyInContext(() -> doWriteActionGroupMembers(group, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700249 "writeActionGroupMembers-" + opType.name());
250 }
251
252 @Override
253 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
254 WriteOperationType opType,
255 PiPipeconf pipeconf) {
256 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
257 "writeActionGroup-" + opType.name());
258 }
259
260 @Override
261 public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
262 PiPipeconf pipeconf) {
263 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
264 "dumpGroups-" + actionProfileId.id());
265 }
266
Yi Tseng3e7f1452017-10-20 10:31:53 -0700267 @Override
268 public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
269 return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
270 }
271
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400272 /* Blocking method implementations below */
273
Yi Tseng3e7f1452017-10-20 10:31:53 -0700274 private boolean doArbitrationUpdate() {
275 CompletableFuture<Boolean> result = new CompletableFuture<>();
276 // TODO: currently we use 64-bit Long type for election id, should
277 // we use 128-bit ?
278 long nextElectId = controller.getNewMasterElectionId();
279 Uint128 newElectionId = Uint128.newBuilder()
280 .setLow(nextElectId)
281 .build();
282 MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
283 .setDeviceId(p4DeviceId)
284 .setElectionId(newElectionId)
285 .build();
286 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
287 .setArbitration(arbitrationUpdate)
288 .build();
289 log.debug("Sending arbitration update to {} with election id {}...",
290 deviceId, newElectionId);
291 arbitrationUpdateMap.put(newElectionId, result);
292 try {
293 streamRequestObserver.onNext(requestMsg);
294 return result.get();
295 } catch (InterruptedException | ExecutionException | StatusRuntimeException e) {
296 log.warn("Arbitration update failed for {} due to {}", deviceId, e);
297 arbitrationUpdateMap.remove(newElectionId);
298 return false;
299 }
300 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400301 private boolean doInitStreamChannel() {
302 // To listen for packets and other events, we need to start the RPC.
303 // Here we do it by sending a master arbitration update.
Yi Tseng3e7f1452017-10-20 10:31:53 -0700304 return doArbitrationUpdate();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400305 }
306
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700307 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400308
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700309 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
310
311 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400312
313 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
314 if (p4Info == null) {
315 // Problem logged by PipeconfHelper.
316 return false;
317 }
318
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700319 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
320 .newBuilder()
321 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
322 .setReassign(true)
323 .setDeviceData(ByteString.copyFrom(deviceData))
324 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400325
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700326 ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
Andrea Campanella0288c872017-08-07 18:32:51 +0200327 .newBuilder()
328 .setDeviceId(p4DeviceId)
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700329 .setP4Info(p4Info)
330 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
331 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400332
333 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
334 .newBuilder()
Yi Tseng3e7f1452017-10-20 10:31:53 -0700335 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400336 .setAction(VERIFY_AND_COMMIT)
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700337 .addConfigs(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400338 .build();
339
340 try {
341 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700342 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400343 } catch (StatusRuntimeException ex) {
344 log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
345 return false;
346 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400347 }
348
349 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
350 PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400351 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
352
353 Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
354 .stream()
355 .map(tableEntryMsg ->
356 Update.newBuilder()
357 .setEntity(Entity.newBuilder()
358 .setTableEntry(tableEntryMsg)
359 .build())
360 .setType(UPDATE_TYPES.get(opType))
361 .build())
362 .collect(Collectors.toList());
363
364 if (updateMsgs.size() == 0) {
365 return true;
366 }
367
368 writeRequestBuilder
369 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700370 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400371 .addAllUpdates(updateMsgs)
372 .build();
373
374 try {
375 blockingStub.write(writeRequestBuilder.build());
376 return true;
377 } catch (StatusRuntimeException e) {
378 log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
379 return false;
380 }
381 }
382
383 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
384
Carmelo Cascone9f007702017-08-24 13:30:51 +0200385 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400386
387 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
388 int tableId;
389 try {
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200390 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400391 } catch (P4InfoBrowser.NotFoundException e) {
392 log.warn("Unable to dump table: {}", e.getMessage());
393 return Collections.emptyList();
394 }
395
396 ReadRequest requestMsg = ReadRequest.newBuilder()
397 .setDeviceId(p4DeviceId)
398 .addEntities(Entity.newBuilder()
399 .setTableEntry(TableEntry.newBuilder()
400 .setTableId(tableId)
401 .build())
402 .build())
403 .build();
404
405 Iterator<ReadResponse> responses;
406 try {
407 responses = blockingStub.read(requestMsg);
408 } catch (StatusRuntimeException e) {
409 log.warn("Unable to dump table: {}", e.getMessage());
410 return Collections.emptyList();
411 }
412
413 Iterable<ReadResponse> responseIterable = () -> responses;
414 List<TableEntry> tableEntryMsgs = StreamSupport
415 .stream(responseIterable.spliterator(), false)
416 .map(ReadResponse::getEntitiesList)
417 .flatMap(List::stream)
418 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
419 .map(Entity::getTableEntry)
420 .collect(Collectors.toList());
421
Carmelo Cascone9f007702017-08-24 13:30:51 +0200422 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400423
424 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
425 }
426
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200427 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
428 try {
429 //encode the PiPacketOperation into a PacketOut
430 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
431
432 //Build the request
433 StreamMessageRequest packetOutRequest = StreamMessageRequest
434 .newBuilder().setPacket(packetOut).build();
435
436 //Send the request
437 streamRequestObserver.onNext(packetOutRequest);
438
439 } catch (P4InfoBrowser.NotFoundException e) {
440 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
441 log.debug("Exception", e);
442 return false;
443 }
444 return true;
445 }
446
Carmelo Casconea966c342017-07-30 01:56:30 -0400447 private void doPacketIn(PacketIn packetInMsg) {
448
449 // Retrieve the pipeconf for this client's device.
450 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
451 if (pipeconfService == null) {
452 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
453 }
454 final PiPipeconf pipeconf;
455 if (pipeconfService.ofDevice(deviceId).isPresent() &&
456 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
457 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
458 } else {
459 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
460 return;
461 }
462 // Decode packet message and post event.
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200463 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf);
464 DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
465 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400466 log.debug("Received packet in: {}", event);
467 controller.postEvent(event);
468 }
469
470 private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700471 log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
Carmelo Casconea966c342017-07-30 01:56:30 -0400472
Yi Tseng3e7f1452017-10-20 10:31:53 -0700473 Uint128 electionId = arbitrationMsg.getElectionId();
474 CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
475
476 if (mastershipFeature == null) {
477 log.warn("Can't find completable future of election id {}", electionId);
478 return;
479 }
480
481 this.p4RuntimeElectionId = electionId;
482 int statusCode = arbitrationMsg.getStatus().getCode();
483 MastershipRole arbitrationRole;
484 // arbitration update success
485
486 if (statusCode == Status.OK.getCode().value()) {
487 mastershipFeature.complete(true);
488 arbitrationRole = MastershipRole.MASTER;
489 } else {
490 mastershipFeature.complete(false);
491 arbitrationRole = MastershipRole.STANDBY;
492 }
493
494 DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId);
495 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
496 arbitrationEventSubject);
497 controller.postEvent(event);
Carmelo Casconea966c342017-07-30 01:56:30 -0400498 }
499
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200500 private Collection<PiCounterCellData> doReadCounterCells(Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
501
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200502 // We use this map to remember the original PI counter IDs of the returned response.
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200503 final Map<Integer, PiCounterId> counterIdMap = Maps.newHashMap();
504
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200505 final ReadRequest request = ReadRequest.newBuilder()
506 .setDeviceId(p4DeviceId)
507 .addAllEntities(CounterEntryCodec.encodePiCounterCellIds(cellIds, counterIdMap, pipeconf))
508 .build();
509
510 if (request.getEntitiesList().size() == 0) {
511 return Collections.emptyList();
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200512 }
513
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200514 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200515 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200516 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200517 } catch (StatusRuntimeException e) {
518 log.warn("Unable to read counters: {}", e.getMessage());
519 return Collections.emptyList();
520 }
521
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200522 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200523 .map(ReadResponse::getEntitiesList)
524 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200525 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200526
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200527 return CounterEntryCodec.decodeCounterEntities(entities, counterIdMap, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200528 }
529
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200530 private boolean doWriteActionGroupMembers(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200531 final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Yi Tseng82512da2017-08-16 19:46:36 -0700532 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200533 for (PiActionGroupMember member : group.members()) {
534 actionProfileMembers.add(ActionProfileMemberEncoder.encode(group, member, pipeconf));
Yi Tseng82512da2017-08-16 19:46:36 -0700535 }
536 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200537 log.warn("Unable to write ({}) group members: {}", opType, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700538 return false;
539 }
540
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200541 final Collection<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700542 .map(actionProfileMember ->
543 Update.newBuilder()
544 .setEntity(Entity.newBuilder()
545 .setActionProfileMember(actionProfileMember)
546 .build())
547 .setType(UPDATE_TYPES.get(opType))
548 .build())
549 .collect(Collectors.toList());
550
551 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200552 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700553 return true;
554 }
555
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200556 WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700557 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700558 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200559 .addAllUpdates(updateMsgs)
560 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700561 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200562 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700563 return true;
564 } catch (StatusRuntimeException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200565 log.warn("Unable to write ({}) group members: {}", opType, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700566 return false;
567 }
568 }
569
570 private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
571 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
572 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200573
574 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700575 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200576 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700577 return Collections.emptySet();
578 }
579
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200580 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700581 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200582 actionProfileId = browser
583 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200584 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200585 .getPreamble()
586 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700587 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200588 log.warn("Unable to dump groups: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700589 return Collections.emptySet();
590 }
591
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200592 // Prepare read request to read all groups from the given action profile.
593 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700594 .setDeviceId(p4DeviceId)
595 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200596 .setActionProfileGroup(
597 ActionProfileGroup.newBuilder()
598 .setActionProfileId(actionProfileId)
599 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700600 .build())
601 .build();
602
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200603 // Read groups.
604 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700605 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200606 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700607 } catch (StatusRuntimeException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200608 log.warn("Unable dump groups from action profile '{}': {}", piActionProfileId.id(), e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700609 return Collections.emptySet();
610 }
611
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200612 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
613 .map(ReadResponse::getEntitiesList)
614 .flatMap(List::stream)
615 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
616 .map(Entity::getActionProfileGroup)
617 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700618
619 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200620 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700621
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200622 // Returned groups contain only a minimal description of their members.
623 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700624
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200625 // Keep a map of all member IDs for each group ID, will need it later.
626 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
627 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
628 g.getGroupId(),
629 g.getMembersList().stream()
630 .map(ActionProfileGroup.Member::getMemberId)
631 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700632
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200633 // Prepare one big read request to read all members in one shot.
634 final Set<Entity> entityMsgs = groupMsgs.stream()
635 .flatMap(g -> g.getMembersList().stream())
636 .map(ActionProfileGroup.Member::getMemberId)
637 // Prevent issuing many read requests for the same member.
638 .distinct()
639 .map(id -> ActionProfileMember.newBuilder()
640 .setActionProfileId(actionProfileId)
641 .setMemberId(id)
642 .build())
643 .map(m -> Entity.newBuilder()
644 .setActionProfileMember(m)
645 .build())
646 .collect(Collectors.toSet());
647 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
648 .addAllEntities(entityMsgs)
649 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700650
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200651 // Read members.
652 final Iterator<ReadResponse> memberResponses;
653 try {
654 memberResponses = blockingStub.read(memberRequestMsg);
655 } catch (StatusRuntimeException e) {
656 log.warn("Unable to read members from action profile {}: {}", piActionProfileId, e.getMessage());
657 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700658 }
659
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200660 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
661 Tools.stream(() -> memberResponses)
662 .map(ReadResponse::getEntitiesList)
663 .flatMap(List::stream)
664 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
665 .map(Entity::getActionProfileMember)
666 .forEach(member -> groupIdToMemberIdsMap.asMap()
667 // Get all group IDs that contain this member.
668 .entrySet()
669 .stream()
670 .filter(entry -> entry.getValue().contains(member.getMemberId()))
671 .map(Map.Entry::getKey)
672 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
673
674 log.debug("Retrieved {} group members from action profile {} on {}...",
675 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
676
677 return groupMsgs.stream()
678 .map(groupMsg -> {
679 try {
680 return ActionProfileGroupEncoder.decode(groupMsg,
681 groupIdToMembersMap.get(groupMsg.getGroupId()),
682 pipeconf);
683 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
684 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
685 return null;
686 }
687 })
688 .filter(Objects::nonNull)
689 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700690 }
691
692 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200693 final ActionProfileGroup actionProfileGroup;
Yi Tseng82512da2017-08-16 19:46:36 -0700694 try {
695 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
696 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200697 log.warn("Unable to encode group: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700698 return false;
699 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200700
701 final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700702 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700703 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200704 .addUpdates(Update.newBuilder()
705 .setEntity(Entity.newBuilder()
706 .setActionProfileGroup(actionProfileGroup)
707 .build())
708 .setType(UPDATE_TYPES.get(opType))
709 .build())
710 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700711 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200712 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700713 return true;
714 } catch (StatusRuntimeException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200715 log.warn("Unable to write groups ({}): {}", opType, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700716 return false;
717 }
718 }
719
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400720 /**
721 * Returns the internal P4 device ID associated with this client.
722 *
723 * @return P4 device ID
724 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200725 public long p4DeviceId() {
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400726 return p4DeviceId;
727 }
728
729 /**
730 * For testing purpose only. TODO: remove before release.
731 *
732 * @return blocking stub
733 */
734 public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
735 return this.blockingStub;
736 }
737
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200738
Andrea Campanella432f7182017-07-14 18:43:27 +0200739 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400740 public void shutdown() {
741
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400742 log.info("Shutting down client for {}...", deviceId);
743
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400744 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400745 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400746 if (streamRequestObserver != null) {
747 streamRequestObserver.onCompleted();
748 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
749 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400750
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400751 this.executorService.shutdown();
752 try {
753 executorService.awaitTermination(5, TimeUnit.SECONDS);
754 } catch (InterruptedException e) {
755 log.warn("Executor service didn't shutdown in time.");
756 }
757 } finally {
758 writeLock.unlock();
759 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400760 }
761
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400762 /**
763 * Handles messages received from the device on the stream channel.
764 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400765 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
766
767 @Override
768 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400769 executorService.submit(() -> doNext(message));
770 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400771
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400772 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -0400773 try {
Andrea Campanella0288c872017-08-07 18:32:51 +0200774 log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -0400775 switch (message.getUpdateCase()) {
776 case PACKET:
777 // Packet-in
778 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +0200779 return;
Carmelo Casconea966c342017-07-30 01:56:30 -0400780 case ARBITRATION:
781 doArbitrationUpdateFromDevice(message.getArbitration());
782 return;
783 default:
784 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
785 }
786 } catch (Throwable ex) {
787 log.error("Exception while processing stream channel message from {}", deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400788 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400789 }
790
791 @Override
792 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400793 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
794 // FIXME: we might want to recreate the channel.
795 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
796 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400797 }
798
799 @Override
800 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400801 log.warn("Stream channel for {} has completed", deviceId);
802 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400803 }
804 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400805}