blob: df4844b0e9d93b13ed7d1bd53cecf4bbef839c4d [file] [log] [blame]
Carmelo Cascone4c289b72019-01-22 15:30:45 -08001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl.client;
18
19import com.google.protobuf.TextFormat;
20import io.grpc.Status;
21import io.grpc.StatusRuntimeException;
22import io.grpc.stub.ClientCallStreamObserver;
23import io.grpc.stub.StreamObserver;
24import org.onosproject.net.DeviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080025import org.onosproject.net.device.DeviceAgentEvent;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080026import org.onosproject.net.pi.model.PiPipeconf;
27import org.onosproject.net.pi.runtime.PiPacketOperation;
28import org.onosproject.net.pi.service.PiPipeconfService;
29import org.onosproject.p4runtime.api.P4RuntimeEvent;
30import org.onosproject.p4runtime.api.P4RuntimeStreamClient;
31import org.onosproject.p4runtime.ctl.codec.CodecException;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080032import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
33import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore.MasterElectionIdListener;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080034import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
35import org.onosproject.p4runtime.ctl.controller.PacketInEvent;
36import org.slf4j.Logger;
37import p4.v1.P4RuntimeOuterClass;
38import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
39import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
40
41import java.math.BigInteger;
42import java.net.ConnectException;
43import java.nio.ByteBuffer;
44import java.util.concurrent.atomic.AtomicBoolean;
45
Carmelo Cascone3977ea42019-02-28 13:43:42 -080046import static com.google.common.base.Preconditions.checkArgument;
47import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080048import static java.lang.String.format;
49import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
50import static org.slf4j.LoggerFactory.getLogger;
51
52/**
53 * Implementation of P4RuntimeStreamClient. Handles P4Runtime StreamChannel RPC
54 * operations, such as arbitration update and packet-in/out.
55 */
56public final class StreamClientImpl implements P4RuntimeStreamClient {
57
58 private static final Logger log = getLogger(StreamClientImpl.class);
59
Carmelo Cascone4c289b72019-01-22 15:30:45 -080060 private final P4RuntimeClientImpl client;
61 private final DeviceId deviceId;
62 private final long p4DeviceId;
63 private final PiPipeconfService pipeconfService;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080064 private final MasterElectionIdStore masterElectionIdStore;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080065 private final P4RuntimeControllerImpl controller;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080066
Carmelo Cascone4c289b72019-01-22 15:30:45 -080067 private final StreamChannelManager streamChannelManager = new StreamChannelManager();
Carmelo Cascone3977ea42019-02-28 13:43:42 -080068 private final MasterElectionIdListener masterElectionIdListener = new InternalMasterElectionIdListener();
Carmelo Cascone4c289b72019-01-22 15:30:45 -080069
Carmelo Cascone3977ea42019-02-28 13:43:42 -080070 private final AtomicBoolean isMaster = new AtomicBoolean(false);
71 private final AtomicBoolean requestedToBeMaster = new AtomicBoolean(false);
Carmelo Cascone4c289b72019-01-22 15:30:45 -080072
Carmelo Cascone3977ea42019-02-28 13:43:42 -080073 private BigInteger pendingElectionId = null;
74 private BigInteger lastUsedElectionId = null;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080075
76 StreamClientImpl(
77 PiPipeconfService pipeconfService,
Carmelo Cascone3977ea42019-02-28 13:43:42 -080078 MasterElectionIdStore masterElectionIdStore,
Carmelo Cascone4c289b72019-01-22 15:30:45 -080079 P4RuntimeClientImpl client,
80 P4RuntimeControllerImpl controller) {
81 this.client = client;
82 this.deviceId = client.deviceId();
83 this.p4DeviceId = client.p4DeviceId();
84 this.pipeconfService = pipeconfService;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080085 this.masterElectionIdStore = masterElectionIdStore;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080086 this.controller = controller;
87 }
88
89 @Override
Carmelo Cascone4c289b72019-01-22 15:30:45 -080090 public boolean isSessionOpen() {
91 return streamChannelManager.isOpen();
92 }
93
94 @Override
95 public void closeSession() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -080096 synchronized (requestedToBeMaster) {
97 this.masterElectionIdStore.unsetListener(deviceId);
98 streamChannelManager.teardown();
99 pendingElectionId = null;
100 requestedToBeMaster.set(false);
101 isMaster.set(false);
102 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800103 }
104
105 @Override
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800106 public void setMastership(boolean master, BigInteger newElectionId) {
107 checkNotNull(newElectionId);
108 checkArgument(newElectionId.compareTo(BigInteger.ZERO) > 0,
109 "newElectionId must be a non zero positive number");
110 synchronized (requestedToBeMaster) {
111 requestedToBeMaster.set(master);
112 pendingElectionId = newElectionId;
113 handlePendingElectionId(masterElectionIdStore.get(deviceId));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800114 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800115 }
116
117 private void handlePendingElectionId(BigInteger masterElectionId) {
118 synchronized (requestedToBeMaster) {
119 if (pendingElectionId == null) {
120 // No pending requests.
121 return;
122 }
123 if (!requestedToBeMaster.get() && masterElectionId != null
124 && pendingElectionId.compareTo(masterElectionId) > 0) {
125 log.info("Deferring sending master arbitration update, master " +
126 "election ID of server ({}) is smaller than " +
127 "requested one ({}), but we do NOT want to be master...",
128 masterElectionId, pendingElectionId);
129 // Will try again as soon as the server reports a new master
130 // election ID that is bigger than the pending non-master one.
131 masterElectionIdStore.setListener(deviceId, masterElectionIdListener);
132 } else {
133 // Send now.
134 log.info("Setting mastership on {}... " +
135 "master={}, newElectionId={}, masterElectionId={}",
136 deviceId, requestedToBeMaster.get(),
137 pendingElectionId, masterElectionId);
138 sendMasterArbitrationUpdate(pendingElectionId);
139 pendingElectionId = null;
140 // No need to listen for master election ID changes.
141 masterElectionIdStore.unsetListener(deviceId);
142 }
143 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800144 }
145
146 @Override
147 public boolean isMaster() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800148 return isMaster.get();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800149 }
150
151 @Override
152 public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
153 if (!isSessionOpen()) {
154 log.debug("Dropping packet-out request for {}, session is closed",
155 deviceId);
156 return;
157 }
158 if (log.isTraceEnabled()) {
159 log.trace("Sending packet-out to {}: {}", deviceId, packet);
160 }
161 try {
162 // Encode the PiPacketOperation into a PacketOut
163 final P4RuntimeOuterClass.PacketOut packetOut =
164 CODECS.packetOut().encode(packet, null, pipeconf);
165 // Build the request
166 final StreamMessageRequest packetOutRequest = StreamMessageRequest
167 .newBuilder().setPacket(packetOut).build();
168 // Send.
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800169 streamChannelManager.send(packetOutRequest);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800170 } catch (CodecException e) {
171 log.error("Unable to send packet-out: {}", e.getMessage());
172 }
173 }
174
175 private void sendMasterArbitrationUpdate(BigInteger electionId) {
176 log.debug("Sending arbitration update to {}... electionId={}",
177 deviceId, electionId);
178 final P4RuntimeOuterClass.Uint128 idMsg = bigIntegerToUint128(electionId);
179 streamChannelManager.send(
180 StreamMessageRequest.newBuilder()
181 .setArbitration(
182 P4RuntimeOuterClass.MasterArbitrationUpdate
183 .newBuilder()
184 .setDeviceId(p4DeviceId)
185 .setElectionId(idMsg)
186 .build())
187 .build());
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800188 lastUsedElectionId = electionId;
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800189 }
190
191 private P4RuntimeOuterClass.Uint128 bigIntegerToUint128(BigInteger value) {
192 final byte[] arr = value.toByteArray();
193 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
194 .put(new byte[Long.BYTES * 2 - arr.length])
195 .put(arr);
196 bb.rewind();
197 return P4RuntimeOuterClass.Uint128.newBuilder()
198 .setHigh(bb.getLong())
199 .setLow(bb.getLong())
200 .build();
201 }
202
203 private BigInteger uint128ToBigInteger(P4RuntimeOuterClass.Uint128 value) {
204 return new BigInteger(
205 ByteBuffer.allocate(Long.BYTES * 2)
206 .putLong(value.getHigh())
207 .putLong(value.getLow())
208 .array());
209 }
210
211 private void handlePacketIn(P4RuntimeOuterClass.PacketIn packetInMsg) {
212 if (log.isTraceEnabled()) {
213 log.trace("Received packet-in from {}: {}", deviceId, packetInMsg);
214 }
215 if (!pipeconfService.getPipeconf(deviceId).isPresent()) {
216 log.warn("Unable to handle packet-in from {}, missing pipeconf: {}",
217 deviceId, TextFormat.shortDebugString(packetInMsg));
218 return;
219 }
220 // Decode packet message and post event.
221 // TODO: consider implementing a cache to speed up
222 // encoding/deconding of packet-in/out (e.g. LLDP, ARP)
223 final PiPipeconf pipeconf = pipeconfService.getPipeconf(deviceId).get();
224 final PiPacketOperation pktOperation;
225 try {
226 pktOperation = CODECS.packetIn().decode(
227 packetInMsg, null, pipeconf);
228 } catch (CodecException e) {
229 log.warn("Unable to process packet-int: {}", e.getMessage());
230 return;
231 }
232 controller.postEvent(new P4RuntimeEvent(
233 P4RuntimeEvent.Type.PACKET_IN,
234 new PacketInEvent(deviceId, pktOperation)));
235 }
236
237 private void handleArbitrationUpdate(P4RuntimeOuterClass.MasterArbitrationUpdate msg) {
238 // From the spec...
239 // - Election_id: The stream RPC with the highest election_id is the
240 // master. Switch populates with the highest election ID it
241 // has received from all connected controllers.
242 // - Status: Switch populates this with OK for the client that is the
243 // master, and with an error status for all other connected clients (at
244 // every mastership change).
245 if (!msg.hasElectionId() || !msg.hasStatus()) {
246 return;
247 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800248 // Is this client master?
249 isMaster.set(msg.getStatus().getCode() == Status.OK.getCode().value());
250 // Notify new master election IDs to all nodes via distributed store.
251 // This is required for those nodes who do not have a Stream RPC open,
252 // and that otherwise would not be aware of changes, keeping their
253 // pending mastership operations forever.
254 final BigInteger masterElectionId = uint128ToBigInteger(msg.getElectionId());
255 masterElectionIdStore.set(deviceId, masterElectionId);
256
257 log.debug("Received arbitration update from {}: isMaster={}, masterElectionId={}",
258 deviceId, isMaster.get(), masterElectionId);
259
260 // Post mastership event via controller.
261 controller.postEvent(new DeviceAgentEvent(
262 isMaster.get() ? DeviceAgentEvent.Type.ROLE_MASTER
263 : DeviceAgentEvent.Type.ROLE_STANDBY, deviceId));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800264 }
265
266 /**
267 * Returns the election ID last used in a MasterArbitrationUpdate message
268 * sent by the client to the server.
269 *
270 * @return election ID uint128 protobuf message
271 */
272 P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800273 return lastUsedElectionId == null
274 ? P4RuntimeOuterClass.Uint128.getDefaultInstance()
275 : bigIntegerToUint128(lastUsedElectionId);
276 }
277
278 /**
279 * Handles updates of the master election ID by applying any pending
280 * mastership operation.
281 */
282 private class InternalMasterElectionIdListener
283 implements MasterElectionIdStore.MasterElectionIdListener {
284
285 @Override
286 public void updated(BigInteger masterElectionId) {
287 handlePendingElectionId(masterElectionId);
288 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800289 }
290
291 /**
292 * A manager for the P4Runtime stream channel that opportunistically creates
293 * new stream RCP stubs (e.g. when one fails because of errors) and posts
294 * channel events via the P4Runtime controller.
295 */
296 private final class StreamChannelManager {
297
298 private final AtomicBoolean open = new AtomicBoolean(false);
299 private final StreamObserver<StreamMessageResponse> responseObserver =
300 new InternalStreamResponseObserver(this);
301 private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
302
303 void send(StreamMessageRequest value) {
304 synchronized (this) {
305 initIfRequired();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800306 requestObserver.onNext(value);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800307 // Optimistically set the session as open. In case of errors, it
308 // will be closed by the response stream observer.
309 streamChannelManager.signalOpen();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800310 }
311 }
312
313 private void initIfRequired() {
314 if (requestObserver == null) {
315 log.debug("Creating new stream channel for {}...", deviceId);
316 open.set(false);
317 client.execRpcNoTimeout(
318 s -> requestObserver =
319 (ClientCallStreamObserver<StreamMessageRequest>)
320 s.streamChannel(responseObserver)
321 );
322 }
323 }
324
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800325 void teardown() {
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800326 synchronized (this) {
327 signalClosed();
328 if (requestObserver != null) {
329 requestObserver.onCompleted();
330 requestObserver.cancel("Completed", null);
331 requestObserver = null;
332 }
333 }
334 }
335
336 void signalOpen() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800337 open.set(true);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800338 }
339
340 void signalClosed() {
341 synchronized (this) {
342 final boolean wasOpen = open.getAndSet(false);
343 if (wasOpen) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800344 // We lost any valid mastership role.
345 controller.postEvent(new DeviceAgentEvent(
346 DeviceAgentEvent.Type.ROLE_NONE, deviceId));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800347 }
348 }
349 }
350
351 boolean isOpen() {
352 return open.get();
353 }
354 }
355
356 /**
357 * Handles messages received from the device on the stream channel.
358 */
359 private final class InternalStreamResponseObserver
360 implements StreamObserver<StreamMessageResponse> {
361
362 private final StreamChannelManager streamChannelManager;
363
364 private InternalStreamResponseObserver(
365 StreamChannelManager streamChannelManager) {
366 this.streamChannelManager = streamChannelManager;
367 }
368
369 @Override
370 public void onNext(StreamMessageResponse message) {
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800371 try {
372 if (log.isTraceEnabled()) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800373 log.trace("Received {} from {}: {}",
374 message.getUpdateCase(), deviceId,
375 TextFormat.shortDebugString(message));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800376 }
377 switch (message.getUpdateCase()) {
378 case PACKET:
379 handlePacketIn(message.getPacket());
380 return;
381 case ARBITRATION:
382 handleArbitrationUpdate(message.getArbitration());
383 return;
384 default:
385 log.warn("Unrecognized StreamMessageResponse from {}: {}",
386 deviceId, message.getUpdateCase());
387 }
388 } catch (Throwable ex) {
389 log.error("Exception while processing stream message from {}",
390 deviceId, ex);
391 }
392 }
393
394 @Override
395 public void onError(Throwable throwable) {
396 if (throwable instanceof StatusRuntimeException) {
397 final StatusRuntimeException sre = (StatusRuntimeException) throwable;
398 if (sre.getStatus().getCause() instanceof ConnectException) {
399 log.warn("{} is unreachable ({})",
400 deviceId, sre.getCause().getMessage());
401 } else {
402 log.warn("Error on stream channel for {}: {}",
403 deviceId, throwable.getMessage());
404 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800405 log.debug("", throwable);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800406 } else {
407 log.error(format("Exception on stream channel for %s",
408 deviceId), throwable);
409 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800410 streamChannelManager.teardown();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800411 }
412
413 @Override
414 public void onCompleted() {
415 log.warn("Stream channel for {} has completed", deviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800416 streamChannelManager.teardown();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800417 }
418 }
419}