blob: 35e9726f32677ba9a883143b28bad9545e4c5012 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
19import com.google.common.collect.ImmutableList;
20import com.google.protobuf.ByteString;
21import com.google.protobuf.ExtensionRegistry;
22import com.google.protobuf.TextFormat;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040023import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040024import io.grpc.ManagedChannel;
25import io.grpc.Status;
26import io.grpc.StatusRuntimeException;
27import io.grpc.stub.StreamObserver;
28import org.onlab.util.ImmutableByteSequence;
29import org.onosproject.net.DeviceId;
30import org.onosproject.net.pi.runtime.PiTableEntry;
31import org.onosproject.net.pi.runtime.PiTableId;
32import org.onosproject.p4runtime.api.P4RuntimeClient;
33import org.onosproject.p4runtime.api.P4RuntimeEvent;
34import org.slf4j.Logger;
35import p4.P4RuntimeGrpc;
36import p4.config.P4InfoOuterClass;
37import p4.tmp.P4Config;
38
39import java.io.IOException;
40import java.io.InputStream;
41import java.io.InputStreamReader;
42import java.util.Collection;
43import java.util.concurrent.CompletableFuture;
44import java.util.concurrent.ExecutorService;
45import java.util.concurrent.TimeUnit;
46
47import static org.onlab.util.ImmutableByteSequence.copyFrom;
48import static org.slf4j.LoggerFactory.getLogger;
49import static p4.P4RuntimeOuterClass.*;
50import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
51
52/**
53 * Implementation of a P4Runtime client.
54 */
55public class P4RuntimeClientImpl implements P4RuntimeClient {
56
57 private static final int DEADLINE_SECONDS = 15;
58
59 private final Logger log = getLogger(getClass());
60
61 private final DeviceId deviceId;
62 private final int p4DeviceId;
63 private final P4RuntimeControllerImpl controller;
64 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
65 private final P4RuntimeGrpc.P4RuntimeStub asyncStub;
66 private ExecutorService executorService;
67 private StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040068 private Context.CancellableContext streamContext;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040069
70
71 P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller,
72 ExecutorService executorService) {
73 this.deviceId = deviceId;
74 this.p4DeviceId = p4DeviceId;
75 this.controller = controller;
76 this.executorService = executorService;
77 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
78 .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
Carmelo Cascone59f57de2017-07-11 19:55:09 -040079 this.asyncStub = P4RuntimeGrpc.newStub(channel);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040080 }
81
82 @Override
83 public CompletableFuture<Boolean> initStreamChannel() {
84 return CompletableFuture.supplyAsync(this::doInitStreamChannel, executorService);
85 }
86
87 private boolean doInitStreamChannel() {
88 if (this.streamRequestObserver == null) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -040089
90 streamContext = Context.current().withCancellation();
91 streamContext.run(
92 () -> streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver()));
93
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040094 // To listen for packets and other events, we need to start the RPC.
Carmelo Cascone59f57de2017-07-11 19:55:09 -040095 // Here we do it by sending a master arbitration update.
96 if (!doArbitrationUpdate()) {
97 log.warn("Unable to initialize stream channel for {}", deviceId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040098 return false;
99 }
100 }
101 return true;
102 }
103
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400104 private boolean doArbitrationUpdate() {
105
106 if (streamRequestObserver == null) {
107 log.error("Null request stream observer for {}", deviceId);
108 return false;
109 }
110
111 try {
112 StreamMessageRequest initRequest = StreamMessageRequest
113 .newBuilder()
114 .setArbitration(MasterArbitrationUpdate
115 .newBuilder()
116 .setDeviceId(p4DeviceId)
117 .build())
118 .build();
119 streamRequestObserver.onNext(initRequest);
120 return true;
121 } catch (StatusRuntimeException e) {
122 log.warn("Arbitration update failed for {}: {}", deviceId, e);
123 return false;
124 }
125 }
126
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400127 @Override
128 public CompletableFuture<Boolean> setPipelineConfig(InputStream p4info, InputStream targetConfig) {
129 return CompletableFuture.supplyAsync(() -> doSetPipelineConfig(p4info, targetConfig), executorService);
130 }
131
132 private boolean doSetPipelineConfig(InputStream p4info, InputStream targetConfig) {
133
134 log.debug("Setting pipeline config for {}", deviceId);
135
136 P4InfoOuterClass.P4Info.Builder p4iInfoBuilder = P4InfoOuterClass.P4Info.newBuilder();
137
138 try {
139 TextFormat.getParser().merge(new InputStreamReader(p4info),
140 ExtensionRegistry.getEmptyRegistry(),
141 p4iInfoBuilder);
142 } catch (IOException ex) {
143 log.warn("Unable to load p4info for {}: {}", deviceId, ex.getMessage());
144 return false;
145 }
146
147 P4Config.P4DeviceConfig deviceIdConfig;
148 try {
149 deviceIdConfig = P4Config.P4DeviceConfig
150 .newBuilder()
151 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
152 .setReassign(true)
153 .setDeviceData(ByteString.readFrom(targetConfig))
154 .build();
155 } catch (IOException ex) {
156 log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
157 return false;
158 }
159
160 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
161 .newBuilder()
162 .setAction(VERIFY_AND_COMMIT)
163 .addConfigs(ForwardingPipelineConfig
164 .newBuilder()
165 .setDeviceId(p4DeviceId)
166 .setP4Info(p4iInfoBuilder.build())
167 .setP4DeviceConfig(deviceIdConfig.toByteString())
168 .build())
169 .build();
170 try {
171 this.blockingStub.setForwardingPipelineConfig(request);
172 } catch (StatusRuntimeException ex) {
173 log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
174 return false;
175 }
176
177 return true;
178 }
179
180 @Override
181 public boolean writeTableEntries(Collection<PiTableEntry> entries, WriteOperationType opType) {
182
183 throw new UnsupportedOperationException("writeTableEntries not implemented.");
184 }
185
186 @Override
187 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId tableId) {
188
189 throw new UnsupportedOperationException("dumpTable not implemented.");
190 }
191
192 @Override
193 public void shutdown() {
194
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400195 log.info("Shutting down client for {}...", deviceId);
196
197 if (streamRequestObserver != null) {
198 streamRequestObserver.onCompleted();
199 streamContext.cancel(null);
200 streamContext = null;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400201 }
202
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400203 this.executorService.shutdown();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400204 try {
205 executorService.awaitTermination(5, TimeUnit.SECONDS);
206 } catch (InterruptedException e) {
207 log.warn("Executor service didn't shutdown in time.");
208 }
209
210 // Prevent the execution of other tasks.
211 executorService = null;
212 }
213
214 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
215
216 @Override
217 public void onNext(StreamMessageResponse message) {
218
219 P4RuntimeEvent event;
220
221 if (message.getPacket().isInitialized()) {
222 // Packet-in
223 PacketIn packetIn = message.getPacket();
224 ImmutableByteSequence data = copyFrom(packetIn.getPayload().asReadOnlyByteBuffer());
225 ImmutableList.Builder<ImmutableByteSequence> metadataBuilder = ImmutableList.builder();
226 packetIn.getMetadataList().stream()
227 .map(m -> m.getValue().asReadOnlyByteBuffer())
228 .map(ImmutableByteSequence::copyFrom)
229 .forEach(metadataBuilder::add);
230 event = new DefaultPacketInEvent(deviceId, data, metadataBuilder.build());
231
232 } else if (message.getArbitration().isInitialized()) {
233 // Arbitration.
234 throw new UnsupportedOperationException("Arbitration not implemented.");
235
236 } else {
237 log.warn("Unrecognized stream message from {}: {}", deviceId, message);
238 return;
239 }
240
241 controller.postEvent(event);
242 }
243
244 @Override
245 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400246 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
247 // FIXME: we might want to recreate the channel.
248 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
249 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400250 }
251
252 @Override
253 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400254 log.warn("Stream channel for {} has completed", deviceId);
255 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400256 }
257 }
258
259}