blob: 132689a241f32e13285eb37e026e26482077a3d1 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
19import com.google.common.collect.ImmutableList;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040021import com.google.protobuf.ByteString;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040022import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040023import io.grpc.ManagedChannel;
24import io.grpc.Status;
25import io.grpc.StatusRuntimeException;
26import io.grpc.stub.StreamObserver;
27import org.onlab.util.ImmutableByteSequence;
28import org.onosproject.net.DeviceId;
Andrea Campanella432f7182017-07-14 18:43:27 +020029import org.onosproject.net.pi.model.PiPipeconf;
30import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040031import org.onosproject.net.pi.runtime.PiTableEntry;
32import org.onosproject.net.pi.runtime.PiTableId;
33import org.onosproject.p4runtime.api.P4RuntimeClient;
34import org.onosproject.p4runtime.api.P4RuntimeEvent;
35import org.slf4j.Logger;
36import p4.P4RuntimeGrpc;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040037import p4.P4RuntimeOuterClass.Entity;
38import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
39import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
40import p4.P4RuntimeOuterClass.PacketIn;
41import p4.P4RuntimeOuterClass.ReadRequest;
42import p4.P4RuntimeOuterClass.ReadResponse;
43import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
44import p4.P4RuntimeOuterClass.StreamMessageRequest;
45import p4.P4RuntimeOuterClass.StreamMessageResponse;
46import p4.P4RuntimeOuterClass.TableEntry;
47import p4.P4RuntimeOuterClass.Uint128;
48import p4.P4RuntimeOuterClass.Update;
49import p4.P4RuntimeOuterClass.WriteRequest;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040050import p4.tmp.P4Config;
51
52import java.io.IOException;
53import java.io.InputStream;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040054import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040055import java.util.Collections;
56import java.util.Iterator;
57import java.util.List;
58import java.util.Map;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040059import java.util.concurrent.CompletableFuture;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040060import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040061import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040062import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040063import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040064import java.util.concurrent.locks.Lock;
65import java.util.concurrent.locks.ReentrantLock;
66import java.util.function.Supplier;
67import java.util.stream.Collectors;
68import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040069
70import static org.onlab.util.ImmutableByteSequence.copyFrom;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040071import static org.onlab.util.Tools.groupedThreads;
72import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040073import static org.slf4j.LoggerFactory.getLogger;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040074import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040075import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
Andrea Campanella432f7182017-07-14 18:43:27 +020076import static p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040077
78/**
79 * Implementation of a P4Runtime client.
80 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -040081public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040082
83 private static final int DEADLINE_SECONDS = 15;
84
Carmelo Cascone8d99b172017-07-18 17:26:31 -040085 // FIXME: use static election ID, since mastership arbitration is not yet support on BMv2 or Tofino.
86 private static final int ELECTION_ID = 1;
87
88 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
89 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
90 WriteOperationType.INSERT, Update.Type.INSERT,
91 WriteOperationType.MODIFY, Update.Type.MODIFY,
92 WriteOperationType.DELETE, Update.Type.DELETE
93 );
94
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040095 private final Logger log = getLogger(getClass());
96
97 private final DeviceId deviceId;
98 private final int p4DeviceId;
99 private final P4RuntimeControllerImpl controller;
100 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400101 private final Context.CancellableContext cancellableContext;
102 private final ExecutorService executorService;
103 private final Executor contextExecutor;
104 private final Lock writeLock = new ReentrantLock();
105 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400106
107
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400108 P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400109 this.deviceId = deviceId;
110 this.p4DeviceId = p4DeviceId;
111 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400112 this.cancellableContext = Context.current().withCancellation();
113 this.executorService = Executors.newFixedThreadPool(5, groupedThreads(
114 "onos/p4runtime-client-" + deviceId.toString(),
115 deviceId.toString() + "-%d"));
116 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400117 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
118 .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400119 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
120 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
121 }
122
123 /**
124 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
125 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
126 * <p>
127 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
128 * <p>
129 */
130 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier) {
131 return CompletableFuture.supplyAsync(() -> {
132 // TODO: explore a more relaxed locking strategy.
133 writeLock.lock();
134 try {
135 return supplier.get();
136 } finally {
137 writeLock.unlock();
138 }
139 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400140 }
141
142 @Override
143 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400144 return supplyInContext(this::doInitStreamChannel);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400145 }
146
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400147 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400148 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
149 return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400150 }
151
152 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400153 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
154 WriteOperationType opType, PiPipeconf pipeconf) {
155 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400156 }
157
158 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400159 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
160 return supplyInContext(() -> doDumpTable(piTableId, pipeconf));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400161 }
162
163 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200164 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
165 CompletableFuture<Boolean> result = new CompletableFuture<>();
166// P4InfoBrowser browser = null; //PipeconfHelper.getP4InfoBrowser(pipeconf);
167// try {
168// ControllerPacketMetadata controllerPacketMetadata =
169// browser.controllerPacketMetadatas().getByName("packet_out");
170// PacketOut.Builder packetOutBuilder = PacketOut.newBuilder();
171// packetOutBuilder.addAllMetadata(packet.metadatas().stream().map(metadata -> {
172// //FIXME we are assuming that there is no more than one metadata per name.
173// int metadataId = controllerPacketMetadata.getMetadataList().stream().filter(metadataInfo -> {
174// return metadataInfo.getName().equals(metadata.id().name());
175// }).findFirst().get().getId();
176// return PacketMetadata.newBuilder()
177// .setMetadataId(metadataId)
178// .setValue(ByteString.copyFrom(metadata.value().asReadOnlyBuffer()))
179// .build();
180// }).filter(Objects::nonNull).collect(Collectors.toList()));
181// packetOutBuilder.setPayload(ByteString.copyFrom(packet.data().asReadOnlyBuffer()));
182// PacketOut packetOut = packetOutBuilder.build();
183// StreamMessageRequest packetOutRequest = StreamMessageRequest
184// .newBuilder().setPacket(packetOut).build();
185// streamRequestObserver.onNext(packetOutRequest);
186// result.complete(true);
187// } catch (P4InfoBrowser.NotFoundException e) {
188// log.error("Cant find metadata with name \"packet_out\" in p4Info file.");
189// result.complete(false);
190// }
191 return result;
192 }
193
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400194 /* Blocking method implementations below */
195
196 private boolean doInitStreamChannel() {
197 // To listen for packets and other events, we need to start the RPC.
198 // Here we do it by sending a master arbitration update.
199 log.info("initializing stream chanel on {}...", deviceId);
200 if (!doArbitrationUpdate()) {
201 log.warn("Unable to initialize stream channel for {}", deviceId);
202 return false;
203 } else {
204 return true;
205 }
206 }
207
208 private boolean doArbitrationUpdate() {
209 log.info("Sending arbitration update to {}...", deviceId);
210 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
211 .setArbitration(MasterArbitrationUpdate.newBuilder()
212 .setDeviceId(p4DeviceId)
213 .build())
214 .build();
215 try {
216 streamRequestObserver.onNext(requestMsg);
217 return true;
218 } catch (StatusRuntimeException e) {
219 log.warn("Arbitration update failed for {}: {}", deviceId, e);
220 return false;
221 }
222 }
223
224 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
225
226 log.info("Setting pipeline config for {} to {} using {}...", deviceId, pipeconf.id(), targetConfigExtType);
227
228 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
229 if (p4Info == null) {
230 // Problem logged by PipeconfHelper.
231 return false;
232 }
233
234 if (!pipeconf.extension(targetConfigExtType).isPresent()) {
235 log.warn("Missing extension {} in pipeconf {}", targetConfigExtType, pipeconf.id());
236 return false;
237 }
238
239 InputStream targetConfig = pipeconf.extension(targetConfigExtType).get();
240 P4Config.P4DeviceConfig p4DeviceConfigMsg;
241 try {
242 p4DeviceConfigMsg = P4Config.P4DeviceConfig
243 .newBuilder()
244 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
245 .setReassign(true)
246 .setDeviceData(ByteString.readFrom(targetConfig))
247 .build();
248 } catch (IOException ex) {
249 log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
250 return false;
251 }
252
253 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
254 .newBuilder()
255 .setAction(VERIFY_AND_COMMIT)
256 .addConfigs(ForwardingPipelineConfig
257 .newBuilder()
258 .setDeviceId(p4DeviceId)
259 .setP4Info(p4Info)
260 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
261 .build())
262 .build();
263
264 try {
265 this.blockingStub.setForwardingPipelineConfig(request);
266
267 } catch (StatusRuntimeException ex) {
268 log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
269 return false;
270 }
271
272 return true;
273 }
274
275 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
276 PiPipeconf pipeconf) {
277
278 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
279
280 Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
281 .stream()
282 .map(tableEntryMsg ->
283 Update.newBuilder()
284 .setEntity(Entity.newBuilder()
285 .setTableEntry(tableEntryMsg)
286 .build())
287 .setType(UPDATE_TYPES.get(opType))
288 .build())
289 .collect(Collectors.toList());
290
291 if (updateMsgs.size() == 0) {
292 return true;
293 }
294
295 writeRequestBuilder
296 .setDeviceId(p4DeviceId)
297 .setElectionId(Uint128.newBuilder()
298 .setHigh(0)
299 .setLow(ELECTION_ID)
300 .build())
301 .addAllUpdates(updateMsgs)
302 .build();
303
304 try {
305 blockingStub.write(writeRequestBuilder.build());
306 return true;
307 } catch (StatusRuntimeException e) {
308 log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
309 return false;
310 }
311 }
312
313 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
314
315 log.info("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
316
317 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
318 int tableId;
319 try {
320 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
321 } catch (P4InfoBrowser.NotFoundException e) {
322 log.warn("Unable to dump table: {}", e.getMessage());
323 return Collections.emptyList();
324 }
325
326 ReadRequest requestMsg = ReadRequest.newBuilder()
327 .setDeviceId(p4DeviceId)
328 .addEntities(Entity.newBuilder()
329 .setTableEntry(TableEntry.newBuilder()
330 .setTableId(tableId)
331 .build())
332 .build())
333 .build();
334
335 Iterator<ReadResponse> responses;
336 try {
337 responses = blockingStub.read(requestMsg);
338 } catch (StatusRuntimeException e) {
339 log.warn("Unable to dump table: {}", e.getMessage());
340 return Collections.emptyList();
341 }
342
343 Iterable<ReadResponse> responseIterable = () -> responses;
344 List<TableEntry> tableEntryMsgs = StreamSupport
345 .stream(responseIterable.spliterator(), false)
346 .map(ReadResponse::getEntitiesList)
347 .flatMap(List::stream)
348 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
349 .map(Entity::getTableEntry)
350 .collect(Collectors.toList());
351
352 log.info("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
353
354 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
355 }
356
Andrea Campanella432f7182017-07-14 18:43:27 +0200357 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400358 public void shutdown() {
359
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400360 log.info("Shutting down client for {}...", deviceId);
361
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400362 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400363 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400364 if (streamRequestObserver != null) {
365 streamRequestObserver.onCompleted();
366 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
367 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400368
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400369 this.executorService.shutdown();
370 try {
371 executorService.awaitTermination(5, TimeUnit.SECONDS);
372 } catch (InterruptedException e) {
373 log.warn("Executor service didn't shutdown in time.");
374 }
375 } finally {
376 writeLock.unlock();
377 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400378 }
379
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400380 /**
381 * Handles messages received from the device on the stream channel.
382 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400383 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
384
385 @Override
386 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400387 executorService.submit(() -> doNext(message));
388 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400389
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400390 private void doNext(StreamMessageResponse message) {
391 log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
392 switch (message.getUpdateCase()) {
393 case PACKET:
394 // Packet-in
395 PacketIn packetIn = message.getPacket();
396 ImmutableByteSequence data = copyFrom(packetIn.getPayload().asReadOnlyByteBuffer());
397 ImmutableList.Builder<ImmutableByteSequence> metadataBuilder = ImmutableList.builder();
398 packetIn.getMetadataList().stream()
399 .map(m -> m.getValue().asReadOnlyByteBuffer())
400 .map(ImmutableByteSequence::copyFrom)
401 .forEach(metadataBuilder::add);
402 P4RuntimeEvent event = new DefaultPacketInEvent(deviceId, data, metadataBuilder.build());
403 controller.postEvent(event);
404 return;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400405
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400406 case ARBITRATION:
407 throw new UnsupportedOperationException("Arbitration not implemented.");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400408
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400409 default:
410 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400411 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400412 }
413
414 @Override
415 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400416 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
417 // FIXME: we might want to recreate the channel.
418 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
419 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400420 }
421
422 @Override
423 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400424 log.warn("Stream channel for {} has completed", deviceId);
425 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400426 }
427 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400428}