blob: 28bcb6a943ea78d83c5019c3581e03a3df159706 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020022import com.google.common.collect.Maps;
Yi Tseng82512da2017-08-16 19:46:36 -070023import com.google.common.collect.Multimap;
Carmelo Cascone7f75be42017-09-07 14:37:02 +020024import com.google.common.collect.Sets;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040025import com.google.protobuf.ByteString;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040026import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040027import io.grpc.ManagedChannel;
28import io.grpc.Status;
29import io.grpc.StatusRuntimeException;
30import io.grpc.stub.StreamObserver;
Andrea Campanella288b2732017-07-28 14:16:16 +020031import org.onlab.osgi.DefaultServiceDirectory;
Yi Tseng82512da2017-08-16 19:46:36 -070032import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040033import org.onosproject.net.DeviceId;
Yi Tseng3e7f1452017-10-20 10:31:53 -070034import org.onosproject.net.MastershipRole;
Carmelo Cascone87892e22017-11-13 16:01:29 -080035import org.onosproject.net.pi.model.PiActionProfileId;
36import org.onosproject.net.pi.model.PiCounterId;
37import org.onosproject.net.pi.model.PiCounterType;
Andrea Campanella432f7182017-07-14 18:43:27 +020038import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080039import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020040import org.onosproject.net.pi.runtime.PiActionGroup;
41import org.onosproject.net.pi.runtime.PiActionGroupMember;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020042import org.onosproject.net.pi.runtime.PiCounterCellData;
43import org.onosproject.net.pi.runtime.PiCounterCellId;
Andrea Campanella432f7182017-07-14 18:43:27 +020044import org.onosproject.net.pi.runtime.PiPacketOperation;
Andrea Campanella288b2732017-07-28 14:16:16 +020045import org.onosproject.net.pi.runtime.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040046import org.onosproject.net.pi.runtime.PiTableEntry;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040047import org.onosproject.p4runtime.api.P4RuntimeClient;
48import org.onosproject.p4runtime.api.P4RuntimeEvent;
49import org.slf4j.Logger;
50import p4.P4RuntimeGrpc;
Yi Tseng82512da2017-08-16 19:46:36 -070051import p4.P4RuntimeOuterClass.ActionProfileGroup;
52import p4.P4RuntimeOuterClass.ActionProfileMember;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040053import p4.P4RuntimeOuterClass.Entity;
54import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
55import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
56import p4.P4RuntimeOuterClass.PacketIn;
57import p4.P4RuntimeOuterClass.ReadRequest;
58import p4.P4RuntimeOuterClass.ReadResponse;
59import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
60import p4.P4RuntimeOuterClass.StreamMessageRequest;
61import p4.P4RuntimeOuterClass.StreamMessageResponse;
62import p4.P4RuntimeOuterClass.TableEntry;
Yi Tseng3e7f1452017-10-20 10:31:53 -070063import p4.P4RuntimeOuterClass.Uint128;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040064import p4.P4RuntimeOuterClass.Update;
65import p4.P4RuntimeOuterClass.WriteRequest;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020066import p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040067import p4.tmp.P4Config;
68
Carmelo Casconed61fdb32017-10-30 10:09:57 -070069import java.nio.ByteBuffer;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040070import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040071import java.util.Collections;
72import java.util.Iterator;
73import java.util.List;
74import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020075import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020076import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040077import java.util.concurrent.CompletableFuture;
Yi Tseng3e7f1452017-10-20 10:31:53 -070078import java.util.concurrent.ExecutionException;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040079import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040080import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040081import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040082import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040083import java.util.concurrent.locks.Lock;
84import java.util.concurrent.locks.ReentrantLock;
85import java.util.function.Supplier;
86import java.util.stream.Collectors;
87import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040088
Carmelo Casconed61fdb32017-10-30 10:09:57 -070089import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040090import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040091import static org.slf4j.LoggerFactory.getLogger;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070092import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
93import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
94import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020095import static p4.P4RuntimeOuterClass.PacketOut;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040096import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
97
98/**
99 * Implementation of a P4Runtime client.
100 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400101public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400102
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400103 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
104 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
105 WriteOperationType.INSERT, Update.Type.INSERT,
106 WriteOperationType.MODIFY, Update.Type.MODIFY,
107 WriteOperationType.DELETE, Update.Type.DELETE
108 );
109
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400110 private final Logger log = getLogger(getClass());
111
112 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200113 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400114 private final P4RuntimeControllerImpl controller;
115 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400116 private final Context.CancellableContext cancellableContext;
117 private final ExecutorService executorService;
118 private final Executor contextExecutor;
119 private final Lock writeLock = new ReentrantLock();
120 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400121
Yi Tseng3e7f1452017-10-20 10:31:53 -0700122 private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
123 protected Uint128 p4RuntimeElectionId;
124
Yi Tseng82512da2017-08-16 19:46:36 -0700125 /**
126 * Default constructor.
127 *
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200128 * @param deviceId the ONOS device id
Yi Tseng82512da2017-08-16 19:46:36 -0700129 * @param p4DeviceId the P4 device id
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200130 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700131 * @param controller runtime client controller
132 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200133 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
134 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400135 this.deviceId = deviceId;
136 this.p4DeviceId = p4DeviceId;
137 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400138 this.cancellableContext = Context.current().withCancellation();
Carmelo Casconea966c342017-07-30 01:56:30 -0400139 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400140 "onos/p4runtime-client-" + deviceId.toString(),
141 deviceId.toString() + "-%d"));
142 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200143 //TODO Investigate deadline or timeout in supplyInContext Method
144 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400145 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
146 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
147 }
148
149 /**
150 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
151 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
152 * <p>
153 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
154 * <p>
155 */
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200156 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400157 return CompletableFuture.supplyAsync(() -> {
158 // TODO: explore a more relaxed locking strategy.
159 writeLock.lock();
160 try {
161 return supplier.get();
Carmelo Casconea966c342017-07-30 01:56:30 -0400162 } catch (Throwable ex) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200163 if (ex instanceof StatusRuntimeException) {
164 log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
165 } else {
166 log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
167 }
Carmelo Casconea966c342017-07-30 01:56:30 -0400168 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400169 } finally {
170 writeLock.unlock();
171 }
172 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400173 }
174
175 @Override
176 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200177 return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400178 }
179
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400180 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700181 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
182 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400183 }
184
185 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400186 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
187 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200188 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
189 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400190 }
191
192 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400193 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200194 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400195 }
196
197 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200198 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200199 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200200 }
201
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200202 @Override
203 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
204 PiPipeconf pipeconf) {
205 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
206 "readCounterCells-" + cellIds.hashCode());
207 }
208
209 @Override
210 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
211 PiPipeconf pipeconf) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200212
213 /*
214 From p4runtime.proto, the scope of a ReadRequest is defined as follows:
215 CounterEntry:
216 - All counter cells for all meters if counter_id = 0 (default).
217 - All counter cells for given counter_id if index = 0 (default).
218 DirectCounterEntry:
219 - All counter cells for all meters if counter_id = 0 (default).
220 - All counter cells for given counter_id if table_entry.match is empty.
221 */
222
223 Set<PiCounterCellId> cellIds = Sets.newHashSet();
224
225 for (PiCounterId counterId : counterIds) {
Carmelo Cascone87892e22017-11-13 16:01:29 -0800226 if (!pipeconf.pipelineModel().counter(counterId).isPresent()) {
227 log.warn("Unable to find counter '{}' in pipeline model", counterId);
228 continue;
229 }
230 PiCounterType counterType = pipeconf.pipelineModel().counter(counterId).get().counterType();
231 switch (counterType) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200232 case INDIRECT:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800233 cellIds.add(PiCounterCellId.ofIndirect(counterId, 0));
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200234 break;
235 case DIRECT:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800236 cellIds.add(PiCounterCellId.ofDirect(counterId, PiTableEntry.EMTPY));
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200237 break;
238 default:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800239 log.warn("Unrecognized PI counter type '{}'", counterType);
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200240 }
241 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200242
243 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
244 "readAllCounterCells-" + cellIds.hashCode());
245 }
246
Yi Tseng82512da2017-08-16 19:46:36 -0700247 @Override
248 public CompletableFuture<Boolean> writeActionGroupMembers(PiActionGroup group,
Yi Tseng82512da2017-08-16 19:46:36 -0700249 WriteOperationType opType,
250 PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200251 return supplyInContext(() -> doWriteActionGroupMembers(group, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700252 "writeActionGroupMembers-" + opType.name());
253 }
254
255 @Override
256 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
257 WriteOperationType opType,
258 PiPipeconf pipeconf) {
259 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
260 "writeActionGroup-" + opType.name());
261 }
262
263 @Override
264 public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
265 PiPipeconf pipeconf) {
266 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
267 "dumpGroups-" + actionProfileId.id());
268 }
269
Yi Tseng3e7f1452017-10-20 10:31:53 -0700270 @Override
271 public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
272 return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
273 }
274
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400275 /* Blocking method implementations below */
276
Yi Tseng3e7f1452017-10-20 10:31:53 -0700277 private boolean doArbitrationUpdate() {
278 CompletableFuture<Boolean> result = new CompletableFuture<>();
279 // TODO: currently we use 64-bit Long type for election id, should
280 // we use 128-bit ?
281 long nextElectId = controller.getNewMasterElectionId();
282 Uint128 newElectionId = Uint128.newBuilder()
283 .setLow(nextElectId)
284 .build();
285 MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
286 .setDeviceId(p4DeviceId)
287 .setElectionId(newElectionId)
288 .build();
289 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
290 .setArbitration(arbitrationUpdate)
291 .build();
292 log.debug("Sending arbitration update to {} with election id {}...",
293 deviceId, newElectionId);
294 arbitrationUpdateMap.put(newElectionId, result);
295 try {
296 streamRequestObserver.onNext(requestMsg);
297 return result.get();
298 } catch (InterruptedException | ExecutionException | StatusRuntimeException e) {
299 log.warn("Arbitration update failed for {} due to {}", deviceId, e);
300 arbitrationUpdateMap.remove(newElectionId);
301 return false;
302 }
303 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400304 private boolean doInitStreamChannel() {
305 // To listen for packets and other events, we need to start the RPC.
306 // Here we do it by sending a master arbitration update.
Yi Tseng3e7f1452017-10-20 10:31:53 -0700307 return doArbitrationUpdate();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400308 }
309
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700310 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400311
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700312 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
313
314 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400315
316 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
317 if (p4Info == null) {
318 // Problem logged by PipeconfHelper.
319 return false;
320 }
321
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700322 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
323 .newBuilder()
324 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
325 .setReassign(true)
326 .setDeviceData(ByteString.copyFrom(deviceData))
327 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400328
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700329 ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
Andrea Campanella0288c872017-08-07 18:32:51 +0200330 .newBuilder()
331 .setDeviceId(p4DeviceId)
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700332 .setP4Info(p4Info)
333 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
334 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400335
336 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
337 .newBuilder()
Yi Tseng3e7f1452017-10-20 10:31:53 -0700338 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400339 .setAction(VERIFY_AND_COMMIT)
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700340 .addConfigs(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400341 .build();
342
343 try {
344 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700345 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400346 } catch (StatusRuntimeException ex) {
347 log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
348 return false;
349 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400350 }
351
352 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
353 PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400354 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
355
356 Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
357 .stream()
358 .map(tableEntryMsg ->
359 Update.newBuilder()
360 .setEntity(Entity.newBuilder()
361 .setTableEntry(tableEntryMsg)
362 .build())
363 .setType(UPDATE_TYPES.get(opType))
364 .build())
365 .collect(Collectors.toList());
366
367 if (updateMsgs.size() == 0) {
368 return true;
369 }
370
371 writeRequestBuilder
372 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700373 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400374 .addAllUpdates(updateMsgs)
375 .build();
376
377 try {
378 blockingStub.write(writeRequestBuilder.build());
379 return true;
380 } catch (StatusRuntimeException e) {
381 log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
382 return false;
383 }
384 }
385
386 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
387
Carmelo Cascone9f007702017-08-24 13:30:51 +0200388 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400389
390 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
391 int tableId;
392 try {
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200393 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400394 } catch (P4InfoBrowser.NotFoundException e) {
395 log.warn("Unable to dump table: {}", e.getMessage());
396 return Collections.emptyList();
397 }
398
399 ReadRequest requestMsg = ReadRequest.newBuilder()
400 .setDeviceId(p4DeviceId)
401 .addEntities(Entity.newBuilder()
402 .setTableEntry(TableEntry.newBuilder()
403 .setTableId(tableId)
404 .build())
405 .build())
406 .build();
407
408 Iterator<ReadResponse> responses;
409 try {
410 responses = blockingStub.read(requestMsg);
411 } catch (StatusRuntimeException e) {
412 log.warn("Unable to dump table: {}", e.getMessage());
413 return Collections.emptyList();
414 }
415
416 Iterable<ReadResponse> responseIterable = () -> responses;
417 List<TableEntry> tableEntryMsgs = StreamSupport
418 .stream(responseIterable.spliterator(), false)
419 .map(ReadResponse::getEntitiesList)
420 .flatMap(List::stream)
421 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
422 .map(Entity::getTableEntry)
423 .collect(Collectors.toList());
424
Carmelo Cascone9f007702017-08-24 13:30:51 +0200425 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400426
427 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
428 }
429
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200430 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
431 try {
432 //encode the PiPacketOperation into a PacketOut
433 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
434
435 //Build the request
436 StreamMessageRequest packetOutRequest = StreamMessageRequest
437 .newBuilder().setPacket(packetOut).build();
438
439 //Send the request
440 streamRequestObserver.onNext(packetOutRequest);
441
442 } catch (P4InfoBrowser.NotFoundException e) {
443 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
444 log.debug("Exception", e);
445 return false;
446 }
447 return true;
448 }
449
Carmelo Casconea966c342017-07-30 01:56:30 -0400450 private void doPacketIn(PacketIn packetInMsg) {
451
452 // Retrieve the pipeconf for this client's device.
453 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
454 if (pipeconfService == null) {
455 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
456 }
457 final PiPipeconf pipeconf;
458 if (pipeconfService.ofDevice(deviceId).isPresent() &&
459 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
460 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
461 } else {
462 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
463 return;
464 }
465 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800466 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200467 DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
468 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400469 log.debug("Received packet in: {}", event);
470 controller.postEvent(event);
471 }
472
473 private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700474 log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
Carmelo Casconea966c342017-07-30 01:56:30 -0400475
Yi Tseng3e7f1452017-10-20 10:31:53 -0700476 Uint128 electionId = arbitrationMsg.getElectionId();
477 CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
478
479 if (mastershipFeature == null) {
480 log.warn("Can't find completable future of election id {}", electionId);
481 return;
482 }
483
484 this.p4RuntimeElectionId = electionId;
485 int statusCode = arbitrationMsg.getStatus().getCode();
486 MastershipRole arbitrationRole;
487 // arbitration update success
488
489 if (statusCode == Status.OK.getCode().value()) {
490 mastershipFeature.complete(true);
491 arbitrationRole = MastershipRole.MASTER;
492 } else {
493 mastershipFeature.complete(false);
494 arbitrationRole = MastershipRole.STANDBY;
495 }
496
497 DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId);
498 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
499 arbitrationEventSubject);
500 controller.postEvent(event);
Carmelo Casconea966c342017-07-30 01:56:30 -0400501 }
502
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200503 private Collection<PiCounterCellData> doReadCounterCells(Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
504
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200505 // We use this map to remember the original PI counter IDs of the returned response.
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200506 final Map<Integer, PiCounterId> counterIdMap = Maps.newHashMap();
507
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200508 final ReadRequest request = ReadRequest.newBuilder()
509 .setDeviceId(p4DeviceId)
510 .addAllEntities(CounterEntryCodec.encodePiCounterCellIds(cellIds, counterIdMap, pipeconf))
511 .build();
512
513 if (request.getEntitiesList().size() == 0) {
514 return Collections.emptyList();
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200515 }
516
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200517 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200518 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200519 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200520 } catch (StatusRuntimeException e) {
521 log.warn("Unable to read counters: {}", e.getMessage());
522 return Collections.emptyList();
523 }
524
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200525 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200526 .map(ReadResponse::getEntitiesList)
527 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200528 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200529
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200530 return CounterEntryCodec.decodeCounterEntities(entities, counterIdMap, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200531 }
532
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200533 private boolean doWriteActionGroupMembers(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200534 final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Yi Tseng82512da2017-08-16 19:46:36 -0700535 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200536 for (PiActionGroupMember member : group.members()) {
537 actionProfileMembers.add(ActionProfileMemberEncoder.encode(group, member, pipeconf));
Yi Tseng82512da2017-08-16 19:46:36 -0700538 }
539 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200540 log.warn("Unable to write ({}) group members: {}", opType, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700541 return false;
542 }
543
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200544 final Collection<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700545 .map(actionProfileMember ->
546 Update.newBuilder()
547 .setEntity(Entity.newBuilder()
548 .setActionProfileMember(actionProfileMember)
549 .build())
550 .setType(UPDATE_TYPES.get(opType))
551 .build())
552 .collect(Collectors.toList());
553
554 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200555 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700556 return true;
557 }
558
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200559 WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700560 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700561 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200562 .addAllUpdates(updateMsgs)
563 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700564 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200565 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700566 return true;
567 } catch (StatusRuntimeException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200568 log.warn("Unable to write ({}) group members: {}", opType, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700569 return false;
570 }
571 }
572
573 private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
574 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
575 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200576
577 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700578 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200579 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700580 return Collections.emptySet();
581 }
582
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200583 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700584 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200585 actionProfileId = browser
586 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200587 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200588 .getPreamble()
589 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700590 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200591 log.warn("Unable to dump groups: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700592 return Collections.emptySet();
593 }
594
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200595 // Prepare read request to read all groups from the given action profile.
596 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700597 .setDeviceId(p4DeviceId)
598 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200599 .setActionProfileGroup(
600 ActionProfileGroup.newBuilder()
601 .setActionProfileId(actionProfileId)
602 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700603 .build())
604 .build();
605
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200606 // Read groups.
607 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700608 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200609 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700610 } catch (StatusRuntimeException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200611 log.warn("Unable dump groups from action profile '{}': {}", piActionProfileId.id(), e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700612 return Collections.emptySet();
613 }
614
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200615 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
616 .map(ReadResponse::getEntitiesList)
617 .flatMap(List::stream)
618 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
619 .map(Entity::getActionProfileGroup)
620 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700621
622 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200623 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700624
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200625 // Returned groups contain only a minimal description of their members.
626 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700627
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200628 // Keep a map of all member IDs for each group ID, will need it later.
629 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
630 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
631 g.getGroupId(),
632 g.getMembersList().stream()
633 .map(ActionProfileGroup.Member::getMemberId)
634 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700635
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200636 // Prepare one big read request to read all members in one shot.
637 final Set<Entity> entityMsgs = groupMsgs.stream()
638 .flatMap(g -> g.getMembersList().stream())
639 .map(ActionProfileGroup.Member::getMemberId)
640 // Prevent issuing many read requests for the same member.
641 .distinct()
642 .map(id -> ActionProfileMember.newBuilder()
643 .setActionProfileId(actionProfileId)
644 .setMemberId(id)
645 .build())
646 .map(m -> Entity.newBuilder()
647 .setActionProfileMember(m)
648 .build())
649 .collect(Collectors.toSet());
650 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
651 .addAllEntities(entityMsgs)
652 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700653
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200654 // Read members.
655 final Iterator<ReadResponse> memberResponses;
656 try {
657 memberResponses = blockingStub.read(memberRequestMsg);
658 } catch (StatusRuntimeException e) {
659 log.warn("Unable to read members from action profile {}: {}", piActionProfileId, e.getMessage());
660 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700661 }
662
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200663 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
664 Tools.stream(() -> memberResponses)
665 .map(ReadResponse::getEntitiesList)
666 .flatMap(List::stream)
667 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
668 .map(Entity::getActionProfileMember)
669 .forEach(member -> groupIdToMemberIdsMap.asMap()
670 // Get all group IDs that contain this member.
671 .entrySet()
672 .stream()
673 .filter(entry -> entry.getValue().contains(member.getMemberId()))
674 .map(Map.Entry::getKey)
675 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
676
677 log.debug("Retrieved {} group members from action profile {} on {}...",
678 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
679
680 return groupMsgs.stream()
681 .map(groupMsg -> {
682 try {
683 return ActionProfileGroupEncoder.decode(groupMsg,
684 groupIdToMembersMap.get(groupMsg.getGroupId()),
685 pipeconf);
686 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
687 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
688 return null;
689 }
690 })
691 .filter(Objects::nonNull)
692 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700693 }
694
695 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200696 final ActionProfileGroup actionProfileGroup;
Yi Tseng82512da2017-08-16 19:46:36 -0700697 try {
698 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
699 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200700 log.warn("Unable to encode group: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700701 return false;
702 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200703
704 final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700705 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700706 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200707 .addUpdates(Update.newBuilder()
708 .setEntity(Entity.newBuilder()
709 .setActionProfileGroup(actionProfileGroup)
710 .build())
711 .setType(UPDATE_TYPES.get(opType))
712 .build())
713 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700714 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200715 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700716 return true;
717 } catch (StatusRuntimeException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200718 log.warn("Unable to write groups ({}): {}", opType, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700719 return false;
720 }
721 }
722
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400723 /**
724 * Returns the internal P4 device ID associated with this client.
725 *
726 * @return P4 device ID
727 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200728 public long p4DeviceId() {
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400729 return p4DeviceId;
730 }
731
732 /**
733 * For testing purpose only. TODO: remove before release.
734 *
735 * @return blocking stub
736 */
737 public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
738 return this.blockingStub;
739 }
740
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200741
Andrea Campanella432f7182017-07-14 18:43:27 +0200742 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400743 public void shutdown() {
744
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400745 log.info("Shutting down client for {}...", deviceId);
746
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400747 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400748 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400749 if (streamRequestObserver != null) {
750 streamRequestObserver.onCompleted();
751 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
752 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400753
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400754 this.executorService.shutdown();
755 try {
756 executorService.awaitTermination(5, TimeUnit.SECONDS);
757 } catch (InterruptedException e) {
758 log.warn("Executor service didn't shutdown in time.");
759 }
760 } finally {
761 writeLock.unlock();
762 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400763 }
764
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400765 /**
766 * Handles messages received from the device on the stream channel.
767 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400768 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
769
770 @Override
771 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400772 executorService.submit(() -> doNext(message));
773 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400774
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400775 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -0400776 try {
Andrea Campanella0288c872017-08-07 18:32:51 +0200777 log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -0400778 switch (message.getUpdateCase()) {
779 case PACKET:
780 // Packet-in
781 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +0200782 return;
Carmelo Casconea966c342017-07-30 01:56:30 -0400783 case ARBITRATION:
784 doArbitrationUpdateFromDevice(message.getArbitration());
785 return;
786 default:
787 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
788 }
789 } catch (Throwable ex) {
790 log.error("Exception while processing stream channel message from {}", deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400791 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400792 }
793
794 @Override
795 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400796 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
797 // FIXME: we might want to recreate the channel.
798 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
799 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400800 }
801
802 @Override
803 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400804 log.warn("Stream channel for {} has completed", deviceId);
805 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400806 }
807 }
Carmelo Cascone87892e22017-11-13 16:01:29 -0800808}