blob: e6603ea5c8c2bb520e6dc1f666166c90ff438018 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
19import com.google.common.collect.ImmutableList;
20import com.google.protobuf.ByteString;
21import com.google.protobuf.ExtensionRegistry;
22import com.google.protobuf.TextFormat;
23import io.grpc.ManagedChannel;
24import io.grpc.Status;
25import io.grpc.StatusRuntimeException;
26import io.grpc.stub.StreamObserver;
27import org.onlab.util.ImmutableByteSequence;
28import org.onosproject.net.DeviceId;
29import org.onosproject.net.pi.runtime.PiTableEntry;
30import org.onosproject.net.pi.runtime.PiTableId;
31import org.onosproject.p4runtime.api.P4RuntimeClient;
32import org.onosproject.p4runtime.api.P4RuntimeEvent;
33import org.slf4j.Logger;
34import p4.P4RuntimeGrpc;
35import p4.config.P4InfoOuterClass;
36import p4.tmp.P4Config;
37
38import java.io.IOException;
39import java.io.InputStream;
40import java.io.InputStreamReader;
41import java.util.Collection;
42import java.util.concurrent.CompletableFuture;
43import java.util.concurrent.ExecutorService;
44import java.util.concurrent.TimeUnit;
45
46import static org.onlab.util.ImmutableByteSequence.copyFrom;
47import static org.slf4j.LoggerFactory.getLogger;
48import static p4.P4RuntimeOuterClass.*;
49import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
50
51/**
52 * Implementation of a P4Runtime client.
53 */
54public class P4RuntimeClientImpl implements P4RuntimeClient {
55
56 private static final int DEADLINE_SECONDS = 15;
57
58 private final Logger log = getLogger(getClass());
59
60 private final DeviceId deviceId;
61 private final int p4DeviceId;
62 private final P4RuntimeControllerImpl controller;
63 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
64 private final P4RuntimeGrpc.P4RuntimeStub asyncStub;
65 private ExecutorService executorService;
66 private StreamObserver<StreamMessageRequest> streamRequestObserver;
67
68
69 P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller,
70 ExecutorService executorService) {
71 this.deviceId = deviceId;
72 this.p4DeviceId = p4DeviceId;
73 this.controller = controller;
74 this.executorService = executorService;
75 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
76 .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
77 this.asyncStub = P4RuntimeGrpc.newStub(channel)
78 .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
79 }
80
81 @Override
82 public CompletableFuture<Boolean> initStreamChannel() {
83 return CompletableFuture.supplyAsync(this::doInitStreamChannel, executorService);
84 }
85
86 private boolean doInitStreamChannel() {
87 if (this.streamRequestObserver == null) {
88 this.streamRequestObserver = this.asyncStub.streamChannel(new StreamChannelResponseObserver());
89 // To listen for packets and other events, we need to start the RPC.
90 // Here we do it by sending an empty packet out.
91 try {
92 this.streamRequestObserver.onNext(StreamMessageRequest.newBuilder()
93 .setPacket(PacketOut.getDefaultInstance())
94 .build());
95 } catch (StatusRuntimeException e) {
96 log.warn("Unable to initialize stream channel for {}: {}", deviceId, e);
97 return false;
98 }
99 }
100 return true;
101 }
102
103 @Override
104 public CompletableFuture<Boolean> setPipelineConfig(InputStream p4info, InputStream targetConfig) {
105 return CompletableFuture.supplyAsync(() -> doSetPipelineConfig(p4info, targetConfig), executorService);
106 }
107
108 private boolean doSetPipelineConfig(InputStream p4info, InputStream targetConfig) {
109
110 log.debug("Setting pipeline config for {}", deviceId);
111
112 P4InfoOuterClass.P4Info.Builder p4iInfoBuilder = P4InfoOuterClass.P4Info.newBuilder();
113
114 try {
115 TextFormat.getParser().merge(new InputStreamReader(p4info),
116 ExtensionRegistry.getEmptyRegistry(),
117 p4iInfoBuilder);
118 } catch (IOException ex) {
119 log.warn("Unable to load p4info for {}: {}", deviceId, ex.getMessage());
120 return false;
121 }
122
123 P4Config.P4DeviceConfig deviceIdConfig;
124 try {
125 deviceIdConfig = P4Config.P4DeviceConfig
126 .newBuilder()
127 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
128 .setReassign(true)
129 .setDeviceData(ByteString.readFrom(targetConfig))
130 .build();
131 } catch (IOException ex) {
132 log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
133 return false;
134 }
135
136 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
137 .newBuilder()
138 .setAction(VERIFY_AND_COMMIT)
139 .addConfigs(ForwardingPipelineConfig
140 .newBuilder()
141 .setDeviceId(p4DeviceId)
142 .setP4Info(p4iInfoBuilder.build())
143 .setP4DeviceConfig(deviceIdConfig.toByteString())
144 .build())
145 .build();
146 try {
147 this.blockingStub.setForwardingPipelineConfig(request);
148 } catch (StatusRuntimeException ex) {
149 log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
150 return false;
151 }
152
153 return true;
154 }
155
156 @Override
157 public boolean writeTableEntries(Collection<PiTableEntry> entries, WriteOperationType opType) {
158
159 throw new UnsupportedOperationException("writeTableEntries not implemented.");
160 }
161
162 @Override
163 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId tableId) {
164
165 throw new UnsupportedOperationException("dumpTable not implemented.");
166 }
167
168 @Override
169 public void shutdown() {
170
171 if (this.streamRequestObserver != null) {
172 this.streamRequestObserver.onError(new StatusRuntimeException(Status.CANCELLED));
173 this.streamRequestObserver.onCompleted();
174 }
175
176 this.executorService.shutdownNow();
177 try {
178 executorService.awaitTermination(5, TimeUnit.SECONDS);
179 } catch (InterruptedException e) {
180 log.warn("Executor service didn't shutdown in time.");
181 }
182
183 // Prevent the execution of other tasks.
184 executorService = null;
185 }
186
187 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
188
189 @Override
190 public void onNext(StreamMessageResponse message) {
191
192 P4RuntimeEvent event;
193
194 if (message.getPacket().isInitialized()) {
195 // Packet-in
196 PacketIn packetIn = message.getPacket();
197 ImmutableByteSequence data = copyFrom(packetIn.getPayload().asReadOnlyByteBuffer());
198 ImmutableList.Builder<ImmutableByteSequence> metadataBuilder = ImmutableList.builder();
199 packetIn.getMetadataList().stream()
200 .map(m -> m.getValue().asReadOnlyByteBuffer())
201 .map(ImmutableByteSequence::copyFrom)
202 .forEach(metadataBuilder::add);
203 event = new DefaultPacketInEvent(deviceId, data, metadataBuilder.build());
204
205 } else if (message.getArbitration().isInitialized()) {
206 // Arbitration.
207 throw new UnsupportedOperationException("Arbitration not implemented.");
208
209 } else {
210 log.warn("Unrecognized stream message from {}: {}", deviceId, message);
211 return;
212 }
213
214 controller.postEvent(event);
215 }
216
217 @Override
218 public void onError(Throwable throwable) {
219 log.warn("Error on stream channel for {}: {}", deviceId, throwable);
220 }
221
222 @Override
223 public void onCompleted() {
224 // TODO: declare the device as disconnected?
225 }
226 }
227
228}