blob: bb2d16bcb65e9bdbbe51f844e1816f22dc861f68 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
19import com.google.common.collect.ImmutableList;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040021import com.google.protobuf.ByteString;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040022import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040023import io.grpc.ManagedChannel;
24import io.grpc.Status;
25import io.grpc.StatusRuntimeException;
26import io.grpc.stub.StreamObserver;
27import org.onlab.util.ImmutableByteSequence;
28import org.onosproject.net.DeviceId;
Andrea Campanella432f7182017-07-14 18:43:27 +020029import org.onosproject.net.pi.model.PiPipeconf;
30import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040031import org.onosproject.net.pi.runtime.PiTableEntry;
32import org.onosproject.net.pi.runtime.PiTableId;
33import org.onosproject.p4runtime.api.P4RuntimeClient;
34import org.onosproject.p4runtime.api.P4RuntimeEvent;
35import org.slf4j.Logger;
36import p4.P4RuntimeGrpc;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040037import p4.P4RuntimeOuterClass.Entity;
38import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
39import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
40import p4.P4RuntimeOuterClass.PacketIn;
41import p4.P4RuntimeOuterClass.ReadRequest;
42import p4.P4RuntimeOuterClass.ReadResponse;
43import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
44import p4.P4RuntimeOuterClass.StreamMessageRequest;
45import p4.P4RuntimeOuterClass.StreamMessageResponse;
46import p4.P4RuntimeOuterClass.TableEntry;
47import p4.P4RuntimeOuterClass.Uint128;
48import p4.P4RuntimeOuterClass.Update;
49import p4.P4RuntimeOuterClass.WriteRequest;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020050import p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040051import p4.tmp.P4Config;
52
53import java.io.IOException;
54import java.io.InputStream;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040055import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040056import java.util.Collections;
57import java.util.Iterator;
58import java.util.List;
59import java.util.Map;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040060import java.util.concurrent.CompletableFuture;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040061import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040062import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040063import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040064import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040065import java.util.concurrent.locks.Lock;
66import java.util.concurrent.locks.ReentrantLock;
67import java.util.function.Supplier;
68import java.util.stream.Collectors;
69import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040070
71import static org.onlab.util.ImmutableByteSequence.copyFrom;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040072import static org.onlab.util.Tools.groupedThreads;
73import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040074import static org.slf4j.LoggerFactory.getLogger;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040075import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020076import static p4.P4RuntimeOuterClass.PacketOut;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040077import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
78
79/**
80 * Implementation of a P4Runtime client.
81 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -040082public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040083
84 private static final int DEADLINE_SECONDS = 15;
85
Carmelo Cascone8d99b172017-07-18 17:26:31 -040086 // FIXME: use static election ID, since mastership arbitration is not yet support on BMv2 or Tofino.
87 private static final int ELECTION_ID = 1;
88
89 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
90 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
91 WriteOperationType.INSERT, Update.Type.INSERT,
92 WriteOperationType.MODIFY, Update.Type.MODIFY,
93 WriteOperationType.DELETE, Update.Type.DELETE
94 );
95
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040096 private final Logger log = getLogger(getClass());
97
98 private final DeviceId deviceId;
99 private final int p4DeviceId;
100 private final P4RuntimeControllerImpl controller;
101 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400102 private final Context.CancellableContext cancellableContext;
103 private final ExecutorService executorService;
104 private final Executor contextExecutor;
105 private final Lock writeLock = new ReentrantLock();
106 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400107
108
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400109 P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400110 this.deviceId = deviceId;
111 this.p4DeviceId = p4DeviceId;
112 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400113 this.cancellableContext = Context.current().withCancellation();
114 this.executorService = Executors.newFixedThreadPool(5, groupedThreads(
115 "onos/p4runtime-client-" + deviceId.toString(),
116 deviceId.toString() + "-%d"));
117 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400118 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
119 .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400120 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
121 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
122 }
123
124 /**
125 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
126 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
127 * <p>
128 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
129 * <p>
130 */
131 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier) {
132 return CompletableFuture.supplyAsync(() -> {
133 // TODO: explore a more relaxed locking strategy.
134 writeLock.lock();
135 try {
136 return supplier.get();
137 } finally {
138 writeLock.unlock();
139 }
140 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400141 }
142
143 @Override
144 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400145 return supplyInContext(this::doInitStreamChannel);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400146 }
147
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400148 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400149 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
150 return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400151 }
152
153 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400154 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
155 WriteOperationType opType, PiPipeconf pipeconf) {
156 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400157 }
158
159 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400160 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
161 return supplyInContext(() -> doDumpTable(piTableId, pipeconf));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400162 }
163
164 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200165 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200166 return supplyInContext(() -> doPacketOut(packet, pipeconf));
Andrea Campanella432f7182017-07-14 18:43:27 +0200167 }
168
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400169 /* Blocking method implementations below */
170
171 private boolean doInitStreamChannel() {
172 // To listen for packets and other events, we need to start the RPC.
173 // Here we do it by sending a master arbitration update.
174 log.info("initializing stream chanel on {}...", deviceId);
175 if (!doArbitrationUpdate()) {
176 log.warn("Unable to initialize stream channel for {}", deviceId);
177 return false;
178 } else {
179 return true;
180 }
181 }
182
183 private boolean doArbitrationUpdate() {
184 log.info("Sending arbitration update to {}...", deviceId);
185 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
186 .setArbitration(MasterArbitrationUpdate.newBuilder()
187 .setDeviceId(p4DeviceId)
188 .build())
189 .build();
190 try {
191 streamRequestObserver.onNext(requestMsg);
192 return true;
193 } catch (StatusRuntimeException e) {
194 log.warn("Arbitration update failed for {}: {}", deviceId, e);
195 return false;
196 }
197 }
198
199 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
200
201 log.info("Setting pipeline config for {} to {} using {}...", deviceId, pipeconf.id(), targetConfigExtType);
202
203 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
204 if (p4Info == null) {
205 // Problem logged by PipeconfHelper.
206 return false;
207 }
208
209 if (!pipeconf.extension(targetConfigExtType).isPresent()) {
210 log.warn("Missing extension {} in pipeconf {}", targetConfigExtType, pipeconf.id());
211 return false;
212 }
213
214 InputStream targetConfig = pipeconf.extension(targetConfigExtType).get();
215 P4Config.P4DeviceConfig p4DeviceConfigMsg;
216 try {
217 p4DeviceConfigMsg = P4Config.P4DeviceConfig
218 .newBuilder()
219 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
220 .setReassign(true)
221 .setDeviceData(ByteString.readFrom(targetConfig))
222 .build();
223 } catch (IOException ex) {
224 log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
225 return false;
226 }
227
228 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
229 .newBuilder()
230 .setAction(VERIFY_AND_COMMIT)
231 .addConfigs(ForwardingPipelineConfig
232 .newBuilder()
233 .setDeviceId(p4DeviceId)
234 .setP4Info(p4Info)
235 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
236 .build())
237 .build();
238
239 try {
240 this.blockingStub.setForwardingPipelineConfig(request);
241
242 } catch (StatusRuntimeException ex) {
243 log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
244 return false;
245 }
246
247 return true;
248 }
249
250 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
251 PiPipeconf pipeconf) {
252
253 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
254
255 Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
256 .stream()
257 .map(tableEntryMsg ->
258 Update.newBuilder()
259 .setEntity(Entity.newBuilder()
260 .setTableEntry(tableEntryMsg)
261 .build())
262 .setType(UPDATE_TYPES.get(opType))
263 .build())
264 .collect(Collectors.toList());
265
266 if (updateMsgs.size() == 0) {
267 return true;
268 }
269
270 writeRequestBuilder
271 .setDeviceId(p4DeviceId)
272 .setElectionId(Uint128.newBuilder()
273 .setHigh(0)
274 .setLow(ELECTION_ID)
275 .build())
276 .addAllUpdates(updateMsgs)
277 .build();
278
279 try {
280 blockingStub.write(writeRequestBuilder.build());
281 return true;
282 } catch (StatusRuntimeException e) {
283 log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
284 return false;
285 }
286 }
287
288 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
289
290 log.info("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
291
292 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
293 int tableId;
294 try {
295 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
296 } catch (P4InfoBrowser.NotFoundException e) {
297 log.warn("Unable to dump table: {}", e.getMessage());
298 return Collections.emptyList();
299 }
300
301 ReadRequest requestMsg = ReadRequest.newBuilder()
302 .setDeviceId(p4DeviceId)
303 .addEntities(Entity.newBuilder()
304 .setTableEntry(TableEntry.newBuilder()
305 .setTableId(tableId)
306 .build())
307 .build())
308 .build();
309
310 Iterator<ReadResponse> responses;
311 try {
312 responses = blockingStub.read(requestMsg);
313 } catch (StatusRuntimeException e) {
314 log.warn("Unable to dump table: {}", e.getMessage());
315 return Collections.emptyList();
316 }
317
318 Iterable<ReadResponse> responseIterable = () -> responses;
319 List<TableEntry> tableEntryMsgs = StreamSupport
320 .stream(responseIterable.spliterator(), false)
321 .map(ReadResponse::getEntitiesList)
322 .flatMap(List::stream)
323 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
324 .map(Entity::getTableEntry)
325 .collect(Collectors.toList());
326
327 log.info("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
328
329 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
330 }
331
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200332 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
333 try {
334 //encode the PiPacketOperation into a PacketOut
335 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
336
337 //Build the request
338 StreamMessageRequest packetOutRequest = StreamMessageRequest
339 .newBuilder().setPacket(packetOut).build();
340
341 //Send the request
342 streamRequestObserver.onNext(packetOutRequest);
343
344 } catch (P4InfoBrowser.NotFoundException e) {
345 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
346 log.debug("Exception", e);
347 return false;
348 }
349 return true;
350 }
351
352
Andrea Campanella432f7182017-07-14 18:43:27 +0200353 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400354 public void shutdown() {
355
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400356 log.info("Shutting down client for {}...", deviceId);
357
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400358 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400359 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400360 if (streamRequestObserver != null) {
361 streamRequestObserver.onCompleted();
362 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
363 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400364
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400365 this.executorService.shutdown();
366 try {
367 executorService.awaitTermination(5, TimeUnit.SECONDS);
368 } catch (InterruptedException e) {
369 log.warn("Executor service didn't shutdown in time.");
370 }
371 } finally {
372 writeLock.unlock();
373 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400374 }
375
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400376 /**
377 * Handles messages received from the device on the stream channel.
378 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400379 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
380
381 @Override
382 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400383 executorService.submit(() -> doNext(message));
384 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400385
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400386 private void doNext(StreamMessageResponse message) {
387 log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
388 switch (message.getUpdateCase()) {
389 case PACKET:
390 // Packet-in
391 PacketIn packetIn = message.getPacket();
392 ImmutableByteSequence data = copyFrom(packetIn.getPayload().asReadOnlyByteBuffer());
393 ImmutableList.Builder<ImmutableByteSequence> metadataBuilder = ImmutableList.builder();
394 packetIn.getMetadataList().stream()
395 .map(m -> m.getValue().asReadOnlyByteBuffer())
396 .map(ImmutableByteSequence::copyFrom)
397 .forEach(metadataBuilder::add);
398 P4RuntimeEvent event = new DefaultPacketInEvent(deviceId, data, metadataBuilder.build());
399 controller.postEvent(event);
400 return;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400401
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400402 case ARBITRATION:
403 throw new UnsupportedOperationException("Arbitration not implemented.");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400404
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400405 default:
406 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400407 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400408 }
409
410 @Override
411 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400412 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
413 // FIXME: we might want to recreate the channel.
414 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
415 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400416 }
417
418 @Override
419 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400420 log.warn("Stream channel for {} has completed", deviceId);
421 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400422 }
423 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400424}