blob: 7f22e79dc18a3d8b52c6aba19264a6b95ad02c6c [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone8d99b172017-07-18 17:26:31 -040019import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070020import com.google.common.collect.Lists;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020021import com.google.common.collect.Maps;
Yi Tseng82512da2017-08-16 19:46:36 -070022import com.google.common.collect.HashMultimap;
23import com.google.common.collect.Multimap;
Carmelo Cascone7f75be42017-09-07 14:37:02 +020024import com.google.common.collect.Sets;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040025import com.google.protobuf.ByteString;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040026import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040027import io.grpc.ManagedChannel;
28import io.grpc.Status;
29import io.grpc.StatusRuntimeException;
30import io.grpc.stub.StreamObserver;
Andrea Campanella288b2732017-07-28 14:16:16 +020031import org.onlab.osgi.DefaultServiceDirectory;
Yi Tseng82512da2017-08-16 19:46:36 -070032import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040033import org.onosproject.net.DeviceId;
Andrea Campanella432f7182017-07-14 18:43:27 +020034import org.onosproject.net.pi.model.PiPipeconf;
Yi Tseng82512da2017-08-16 19:46:36 -070035import org.onosproject.net.pi.runtime.PiActionProfileId;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020036import org.onosproject.net.pi.runtime.PiCounterCellData;
37import org.onosproject.net.pi.runtime.PiCounterCellId;
38import org.onosproject.net.pi.runtime.PiCounterId;
Carmelo Cascone7f75be42017-09-07 14:37:02 +020039import org.onosproject.net.pi.runtime.PiDirectCounterCellId;
40import org.onosproject.net.pi.runtime.PiIndirectCounterCellId;
Yi Tseng82512da2017-08-16 19:46:36 -070041import org.onosproject.net.pi.runtime.PiActionGroup;
42import org.onosproject.net.pi.runtime.PiActionGroupMember;
Andrea Campanella432f7182017-07-14 18:43:27 +020043import org.onosproject.net.pi.runtime.PiPacketOperation;
Andrea Campanella288b2732017-07-28 14:16:16 +020044import org.onosproject.net.pi.runtime.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040045import org.onosproject.net.pi.runtime.PiTableEntry;
46import org.onosproject.net.pi.runtime.PiTableId;
47import org.onosproject.p4runtime.api.P4RuntimeClient;
48import org.onosproject.p4runtime.api.P4RuntimeEvent;
49import org.slf4j.Logger;
50import p4.P4RuntimeGrpc;
Yi Tseng82512da2017-08-16 19:46:36 -070051import p4.P4RuntimeOuterClass.ActionProfileGroup;
52import p4.P4RuntimeOuterClass.ActionProfileMember;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040053import p4.P4RuntimeOuterClass.Entity;
54import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
55import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
56import p4.P4RuntimeOuterClass.PacketIn;
57import p4.P4RuntimeOuterClass.ReadRequest;
58import p4.P4RuntimeOuterClass.ReadResponse;
59import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
60import p4.P4RuntimeOuterClass.StreamMessageRequest;
61import p4.P4RuntimeOuterClass.StreamMessageResponse;
62import p4.P4RuntimeOuterClass.TableEntry;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040063import p4.P4RuntimeOuterClass.Update;
64import p4.P4RuntimeOuterClass.WriteRequest;
Yi Tseng82512da2017-08-16 19:46:36 -070065import p4.config.P4InfoOuterClass;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020066import p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040067import p4.tmp.P4Config;
68
69import java.io.IOException;
70import java.io.InputStream;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040071import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040072import java.util.Collections;
73import java.util.Iterator;
74import java.util.List;
75import java.util.Map;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020076import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040077import java.util.concurrent.CompletableFuture;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040078import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040079import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040080import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040081import java.util.concurrent.TimeUnit;
Yi Tseng82512da2017-08-16 19:46:36 -070082import java.util.concurrent.atomic.AtomicBoolean;
83import java.util.concurrent.atomic.AtomicLong;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040084import java.util.concurrent.locks.Lock;
85import java.util.concurrent.locks.ReentrantLock;
86import java.util.function.Supplier;
87import java.util.stream.Collectors;
88import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040089
Carmelo Cascone8d99b172017-07-18 17:26:31 -040090import static org.onlab.util.Tools.groupedThreads;
91import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040092import static org.slf4j.LoggerFactory.getLogger;
Yi Tseng82512da2017-08-16 19:46:36 -070093import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
94import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040095import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020096import static p4.P4RuntimeOuterClass.PacketOut;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040097import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
98
99/**
100 * Implementation of a P4Runtime client.
101 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400102public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400103
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400104 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
105 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
106 WriteOperationType.INSERT, Update.Type.INSERT,
107 WriteOperationType.MODIFY, Update.Type.MODIFY,
108 WriteOperationType.DELETE, Update.Type.DELETE
109 );
110
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400111 private final Logger log = getLogger(getClass());
112
113 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200114 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400115 private final P4RuntimeControllerImpl controller;
116 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400117 private final Context.CancellableContext cancellableContext;
118 private final ExecutorService executorService;
119 private final Executor contextExecutor;
120 private final Lock writeLock = new ReentrantLock();
121 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400122
Yi Tseng82512da2017-08-16 19:46:36 -0700123 /**
124 * Default constructor.
125 *
126 * @param deviceId the ONOS device id
127 * @param p4DeviceId the P4 device id
128 * @param channel gRPC channel
129 * @param controller runtime client controller
130 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200131 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
132 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400133 this.deviceId = deviceId;
134 this.p4DeviceId = p4DeviceId;
135 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400136 this.cancellableContext = Context.current().withCancellation();
Carmelo Casconea966c342017-07-30 01:56:30 -0400137 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400138 "onos/p4runtime-client-" + deviceId.toString(),
139 deviceId.toString() + "-%d"));
140 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200141 //TODO Investigate deadline or timeout in supplyInContext Method
142 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400143 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
144 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
145 }
146
147 /**
148 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
149 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
150 * <p>
151 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
152 * <p>
153 */
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200154 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400155 return CompletableFuture.supplyAsync(() -> {
156 // TODO: explore a more relaxed locking strategy.
157 writeLock.lock();
158 try {
159 return supplier.get();
Carmelo Casconea966c342017-07-30 01:56:30 -0400160 } catch (Throwable ex) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200161 if (ex instanceof StatusRuntimeException) {
162 log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
163 } else {
164 log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
165 }
Carmelo Casconea966c342017-07-30 01:56:30 -0400166 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400167 } finally {
168 writeLock.unlock();
169 }
170 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400171 }
172
173 @Override
174 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200175 return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400176 }
177
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400178 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400179 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200180 return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400181 }
182
183 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400184 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
185 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200186 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
187 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400188 }
189
190 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400191 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200192 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400193 }
194
195 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200196 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200197 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200198 }
199
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200200 @Override
201 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
202 PiPipeconf pipeconf) {
203 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
204 "readCounterCells-" + cellIds.hashCode());
205 }
206
207 @Override
208 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
209 PiPipeconf pipeconf) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200210
211 /*
212 From p4runtime.proto, the scope of a ReadRequest is defined as follows:
213 CounterEntry:
214 - All counter cells for all meters if counter_id = 0 (default).
215 - All counter cells for given counter_id if index = 0 (default).
216 DirectCounterEntry:
217 - All counter cells for all meters if counter_id = 0 (default).
218 - All counter cells for given counter_id if table_entry.match is empty.
219 */
220
221 Set<PiCounterCellId> cellIds = Sets.newHashSet();
222
223 for (PiCounterId counterId : counterIds) {
224 switch (counterId.type()) {
225 case INDIRECT:
226 cellIds.add(PiIndirectCounterCellId.of(counterId, 0));
227 break;
228 case DIRECT:
229 cellIds.add(PiDirectCounterCellId.of(counterId, PiTableEntry.EMTPY));
230 break;
231 default:
232 log.warn("Unrecognized PI counter ID '{}'", counterId.type());
233 }
234 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200235
236 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
237 "readAllCounterCells-" + cellIds.hashCode());
238 }
239
Yi Tseng82512da2017-08-16 19:46:36 -0700240 @Override
241 public CompletableFuture<Boolean> writeActionGroupMembers(PiActionGroup group,
242 Collection<PiActionGroupMember> members,
243 WriteOperationType opType,
244 PiPipeconf pipeconf) {
245 return supplyInContext(() -> doWriteActionGroupMembers(group, members, opType, pipeconf),
246 "writeActionGroupMembers-" + opType.name());
247 }
248
249 @Override
250 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
251 WriteOperationType opType,
252 PiPipeconf pipeconf) {
253 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
254 "writeActionGroup-" + opType.name());
255 }
256
257 @Override
258 public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
259 PiPipeconf pipeconf) {
260 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
261 "dumpGroups-" + actionProfileId.id());
262 }
263
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400264 /* Blocking method implementations below */
265
266 private boolean doInitStreamChannel() {
267 // To listen for packets and other events, we need to start the RPC.
268 // Here we do it by sending a master arbitration update.
269 log.info("initializing stream chanel on {}...", deviceId);
270 if (!doArbitrationUpdate()) {
271 log.warn("Unable to initialize stream channel for {}", deviceId);
272 return false;
273 } else {
274 return true;
275 }
276 }
277
278 private boolean doArbitrationUpdate() {
279 log.info("Sending arbitration update to {}...", deviceId);
280 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
281 .setArbitration(MasterArbitrationUpdate.newBuilder()
282 .setDeviceId(p4DeviceId)
283 .build())
284 .build();
285 try {
286 streamRequestObserver.onNext(requestMsg);
287 return true;
288 } catch (StatusRuntimeException e) {
289 log.warn("Arbitration update failed for {}: {}", deviceId, e);
290 return false;
291 }
292 }
293
294 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
295
296 log.info("Setting pipeline config for {} to {} using {}...", deviceId, pipeconf.id(), targetConfigExtType);
297
298 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
299 if (p4Info == null) {
300 // Problem logged by PipeconfHelper.
301 return false;
302 }
303
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400304
Andrea Campanella0288c872017-08-07 18:32:51 +0200305 ForwardingPipelineConfig.Builder pipelineConfigBuilder = ForwardingPipelineConfig
306 .newBuilder()
307 .setDeviceId(p4DeviceId)
308 .setP4Info(p4Info);
309
310 //if the target config extension is null we don't want to add the config.
311 if (targetConfigExtType != null) {
312 if (!pipeconf.extension(targetConfigExtType).isPresent()) {
313 log.warn("Missing extension {} in pipeconf {}", targetConfigExtType, pipeconf.id());
314 return false;
315 }
316 InputStream targetConfig = pipeconf.extension(targetConfigExtType).get();
317 P4Config.P4DeviceConfig p4DeviceConfigMsg;
318 try {
319 p4DeviceConfigMsg = P4Config.P4DeviceConfig
320 .newBuilder()
321 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
322 .setReassign(true)
323 .setDeviceData(ByteString.readFrom(targetConfig))
324 .build();
325
326 pipelineConfigBuilder.setP4DeviceConfig(p4DeviceConfigMsg.toByteString());
327
328 } catch (IOException ex) {
329 log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
330 return false;
331 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400332 }
333
334 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
335 .newBuilder()
336 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella0288c872017-08-07 18:32:51 +0200337 .addConfigs(pipelineConfigBuilder.build())
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400338 .build();
339
340 try {
341 this.blockingStub.setForwardingPipelineConfig(request);
342
343 } catch (StatusRuntimeException ex) {
344 log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
345 return false;
346 }
347
348 return true;
349 }
350
351 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
352 PiPipeconf pipeconf) {
353
354 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
355
356 Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
357 .stream()
358 .map(tableEntryMsg ->
359 Update.newBuilder()
360 .setEntity(Entity.newBuilder()
361 .setTableEntry(tableEntryMsg)
362 .build())
363 .setType(UPDATE_TYPES.get(opType))
364 .build())
365 .collect(Collectors.toList());
366
367 if (updateMsgs.size() == 0) {
368 return true;
369 }
370
371 writeRequestBuilder
372 .setDeviceId(p4DeviceId)
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200373 /* PI ignores this ElectionId, commenting out for now.
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400374 .setElectionId(Uint128.newBuilder()
375 .setHigh(0)
376 .setLow(ELECTION_ID)
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200377 .build()) */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400378 .addAllUpdates(updateMsgs)
379 .build();
380
381 try {
382 blockingStub.write(writeRequestBuilder.build());
383 return true;
384 } catch (StatusRuntimeException e) {
385 log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
386 return false;
387 }
388 }
389
390 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
391
Carmelo Cascone9f007702017-08-24 13:30:51 +0200392 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400393
394 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
395 int tableId;
396 try {
397 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
398 } catch (P4InfoBrowser.NotFoundException e) {
399 log.warn("Unable to dump table: {}", e.getMessage());
400 return Collections.emptyList();
401 }
402
403 ReadRequest requestMsg = ReadRequest.newBuilder()
404 .setDeviceId(p4DeviceId)
405 .addEntities(Entity.newBuilder()
406 .setTableEntry(TableEntry.newBuilder()
407 .setTableId(tableId)
408 .build())
409 .build())
410 .build();
411
412 Iterator<ReadResponse> responses;
413 try {
414 responses = blockingStub.read(requestMsg);
415 } catch (StatusRuntimeException e) {
416 log.warn("Unable to dump table: {}", e.getMessage());
417 return Collections.emptyList();
418 }
419
420 Iterable<ReadResponse> responseIterable = () -> responses;
421 List<TableEntry> tableEntryMsgs = StreamSupport
422 .stream(responseIterable.spliterator(), false)
423 .map(ReadResponse::getEntitiesList)
424 .flatMap(List::stream)
425 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
426 .map(Entity::getTableEntry)
427 .collect(Collectors.toList());
428
Carmelo Cascone9f007702017-08-24 13:30:51 +0200429 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400430
431 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
432 }
433
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200434 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
435 try {
436 //encode the PiPacketOperation into a PacketOut
437 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
438
439 //Build the request
440 StreamMessageRequest packetOutRequest = StreamMessageRequest
441 .newBuilder().setPacket(packetOut).build();
442
443 //Send the request
444 streamRequestObserver.onNext(packetOutRequest);
445
446 } catch (P4InfoBrowser.NotFoundException e) {
447 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
448 log.debug("Exception", e);
449 return false;
450 }
451 return true;
452 }
453
Carmelo Casconea966c342017-07-30 01:56:30 -0400454 private void doPacketIn(PacketIn packetInMsg) {
455
456 // Retrieve the pipeconf for this client's device.
457 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
458 if (pipeconfService == null) {
459 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
460 }
461 final PiPipeconf pipeconf;
462 if (pipeconfService.ofDevice(deviceId).isPresent() &&
463 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
464 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
465 } else {
466 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
467 return;
468 }
469 // Decode packet message and post event.
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200470 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf);
471 DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
472 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400473 log.debug("Received packet in: {}", event);
474 controller.postEvent(event);
475 }
476
477 private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
478
479 log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
480 }
481
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200482 private Collection<PiCounterCellData> doReadCounterCells(Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
483
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200484 // We use this map to remember the original PI counter IDs of the returned response.
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200485 final Map<Integer, PiCounterId> counterIdMap = Maps.newHashMap();
486
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200487 final ReadRequest request = ReadRequest.newBuilder()
488 .setDeviceId(p4DeviceId)
489 .addAllEntities(CounterEntryCodec.encodePiCounterCellIds(cellIds, counterIdMap, pipeconf))
490 .build();
491
492 if (request.getEntitiesList().size() == 0) {
493 return Collections.emptyList();
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200494 }
495
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200496 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200497 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200498 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200499 } catch (StatusRuntimeException e) {
500 log.warn("Unable to read counters: {}", e.getMessage());
501 return Collections.emptyList();
502 }
503
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200504 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200505 .map(ReadResponse::getEntitiesList)
506 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200507 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200508
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200509 return CounterEntryCodec.decodeCounterEntities(entities, counterIdMap, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200510 }
511
Yi Tseng82512da2017-08-16 19:46:36 -0700512 private boolean doWriteActionGroupMembers(PiActionGroup group, Collection<PiActionGroupMember> members,
513 WriteOperationType opType, PiPipeconf pipeconf) {
514 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
515
516 Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
517 try {
518 for (PiActionGroupMember member : members) {
519 actionProfileMembers.add(
520 ActionProfileMemberEncoder.encode(group, member, pipeconf)
521 );
522 }
523 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
524 log.warn("Can't encode group member {} due to {}", members, e.getMessage());
525 return false;
526 }
527
528 Collection<Update> updateMsgs = actionProfileMembers.stream()
529 .map(actionProfileMember ->
530 Update.newBuilder()
531 .setEntity(Entity.newBuilder()
532 .setActionProfileMember(actionProfileMember)
533 .build())
534 .setType(UPDATE_TYPES.get(opType))
535 .build())
536 .collect(Collectors.toList());
537
538 if (updateMsgs.size() == 0) {
539 // Nothing to update
540 return true;
541 }
542
543 writeRequestBuilder
544 .setDeviceId(p4DeviceId)
545 .addAllUpdates(updateMsgs);
546 try {
547 blockingStub.write(writeRequestBuilder.build());
548 return true;
549 } catch (StatusRuntimeException e) {
550 log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
551 return false;
552 }
553 }
554
555 private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
556 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
557 piActionProfileId.id(), deviceId, pipeconf.id());
558 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
559 if (browser == null) {
560 log.warn("Unable to get a P4Info browser for pipeconf {}, skipping dump action profile {}",
561 pipeconf, piActionProfileId);
562 return Collections.emptySet();
563 }
564
565 int actionProfileId;
566 try {
567 P4InfoOuterClass.ActionProfile actionProfile =
568 browser.actionProfiles().getByName(piActionProfileId.id());
569 actionProfileId = actionProfile.getPreamble().getId();
570 } catch (P4InfoBrowser.NotFoundException e) {
571 log.warn("Can't find action profile {} from p4info", piActionProfileId);
572 return Collections.emptySet();
573 }
574
575 ActionProfileGroup actionProfileGroup =
576 ActionProfileGroup.newBuilder()
577 .setActionProfileId(actionProfileId)
578 .build();
579
580 ReadRequest requestMsg = ReadRequest.newBuilder()
581 .setDeviceId(p4DeviceId)
582 .addEntities(Entity.newBuilder()
583 .setActionProfileGroup(actionProfileGroup)
584 .build())
585 .build();
586
587 Iterator<ReadResponse> responses;
588 try {
589 responses = blockingStub.read(requestMsg);
590 } catch (StatusRuntimeException e) {
591 log.warn("Unable to read action profile {} due to {}", piActionProfileId, e.getMessage());
592 return Collections.emptySet();
593 }
594
595 List<ActionProfileGroup> actionProfileGroups =
596 Tools.stream(() -> responses)
597 .map(ReadResponse::getEntitiesList)
598 .flatMap(List::stream)
599 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
600 .map(Entity::getActionProfileGroup)
601 .collect(Collectors.toList());
602
603 log.debug("Retrieved {} groups from action profile {} on {}...",
604 actionProfileGroups.size(), piActionProfileId.id(), deviceId);
605
606 // group id -> members
607 Multimap<Integer, ActionProfileMember> actionProfileMemberMap = HashMultimap.create();
608 AtomicLong memberCount = new AtomicLong(0);
609 AtomicBoolean success = new AtomicBoolean(true);
610 actionProfileGroups.forEach(actProfGrp -> {
611 actProfGrp.getMembersList().forEach(member -> {
612 ActionProfileMember actProfMember =
613 ActionProfileMember.newBuilder()
614 .setActionProfileId(actProfGrp.getActionProfileId())
615 .setMemberId(member.getMemberId())
616 .build();
617 Entity entity = Entity.newBuilder()
618 .setActionProfileMember(actProfMember)
619 .build();
620
621 ReadRequest reqMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
622 .addEntities(entity)
623 .build();
624
625 Iterator<ReadResponse> resps;
626 try {
627 resps = blockingStub.read(reqMsg);
628 } catch (StatusRuntimeException e) {
629 log.warn("Unable to read member {} from action profile {} due to {}",
630 member, piActionProfileId, e.getMessage());
631 success.set(false);
632 return;
633 }
634 Tools.stream(() -> resps)
635 .map(ReadResponse::getEntitiesList)
636 .flatMap(List::stream)
637 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
638 .map(Entity::getActionProfileMember)
639 .forEach(m -> {
640 actionProfileMemberMap.put(actProfGrp.getGroupId(), m);
641 memberCount.incrementAndGet();
642 });
643 });
644 });
645
646 if (!success.get()) {
647 // Can't read members
648 return Collections.emptySet();
649 }
650 log.info("Retrieved {} group members from action profile {} on {}...",
651 memberCount.get(), piActionProfileId.id(), deviceId);
652
653 Collection<PiActionGroup> piActionGroups = Sets.newHashSet();
654
655 for (ActionProfileGroup apg : actionProfileGroups) {
656 try {
657 Collection<ActionProfileMember> members = actionProfileMemberMap.get(apg.getGroupId());
658 PiActionGroup decodedGroup =
659 ActionProfileGroupEncoder.decode(apg, members, pipeconf);
660 piActionGroups.add(decodedGroup);
661 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
662 log.warn("Can't decode group {} due to {}", apg, e.getMessage());
663 return Collections.emptySet();
664 }
665 }
666
667 return piActionGroups;
668 }
669
670 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
671 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
672 ActionProfileGroup actionProfileGroup;
673 try {
674 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
675 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
676 log.warn("Can't encode group {} due to {}", e.getMessage());
677 return false;
678 }
679 Update updateMessage = Update.newBuilder()
680 .setEntity(Entity.newBuilder()
681 .setActionProfileGroup(actionProfileGroup)
682 .build())
683 .setType(UPDATE_TYPES.get(opType))
684 .build();
685 writeRequestBuilder
686 .setDeviceId(p4DeviceId)
687 .addUpdates(updateMessage);
688 try {
689 blockingStub.write(writeRequestBuilder.build());
690 return true;
691 } catch (StatusRuntimeException e) {
692 log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
693 return false;
694 }
695 }
696
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400697 /**
698 * Returns the internal P4 device ID associated with this client.
699 *
700 * @return P4 device ID
701 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200702 public long p4DeviceId() {
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400703 return p4DeviceId;
704 }
705
706 /**
707 * For testing purpose only. TODO: remove before release.
708 *
709 * @return blocking stub
710 */
711 public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
712 return this.blockingStub;
713 }
714
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200715
Andrea Campanella432f7182017-07-14 18:43:27 +0200716 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400717 public void shutdown() {
718
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400719 log.info("Shutting down client for {}...", deviceId);
720
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400721 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400722 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400723 if (streamRequestObserver != null) {
724 streamRequestObserver.onCompleted();
725 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
726 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400727
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400728 this.executorService.shutdown();
729 try {
730 executorService.awaitTermination(5, TimeUnit.SECONDS);
731 } catch (InterruptedException e) {
732 log.warn("Executor service didn't shutdown in time.");
733 }
734 } finally {
735 writeLock.unlock();
736 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400737 }
738
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400739 /**
740 * Handles messages received from the device on the stream channel.
741 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400742 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
743
744 @Override
745 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400746 executorService.submit(() -> doNext(message));
747 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400748
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400749 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -0400750 try {
Andrea Campanella0288c872017-08-07 18:32:51 +0200751 log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -0400752 switch (message.getUpdateCase()) {
753 case PACKET:
754 // Packet-in
755 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +0200756 return;
Carmelo Casconea966c342017-07-30 01:56:30 -0400757 case ARBITRATION:
758 doArbitrationUpdateFromDevice(message.getArbitration());
759 return;
760 default:
761 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
762 }
763 } catch (Throwable ex) {
764 log.error("Exception while processing stream channel message from {}", deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400765 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400766 }
767
768 @Override
769 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400770 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
771 // FIXME: we might want to recreate the channel.
772 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
773 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400774 }
775
776 @Override
777 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400778 log.warn("Stream channel for {} has completed", deviceId);
779 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400780 }
781 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400782}