blob: 11e91aa7f3f76a9bec89f735b9c2e235406db391 [file] [log] [blame]
Carmelo Cascone4c289b72019-01-22 15:30:45 -08001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl.client;
18
19import com.google.protobuf.TextFormat;
20import io.grpc.Status;
21import io.grpc.StatusRuntimeException;
22import io.grpc.stub.ClientCallStreamObserver;
23import io.grpc.stub.StreamObserver;
Carmelo Casconeb8a25052019-04-08 15:22:32 -070024import org.onlab.util.SharedScheduledExecutors;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080025import org.onosproject.net.DeviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080026import org.onosproject.net.device.DeviceAgentEvent;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080027import org.onosproject.net.pi.model.PiPipeconf;
28import org.onosproject.net.pi.runtime.PiPacketOperation;
29import org.onosproject.net.pi.service.PiPipeconfService;
30import org.onosproject.p4runtime.api.P4RuntimeEvent;
31import org.onosproject.p4runtime.api.P4RuntimeStreamClient;
32import org.onosproject.p4runtime.ctl.codec.CodecException;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080033import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
34import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore.MasterElectionIdListener;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080035import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
36import org.onosproject.p4runtime.ctl.controller.PacketInEvent;
37import org.slf4j.Logger;
38import p4.v1.P4RuntimeOuterClass;
39import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
40import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
41
42import java.math.BigInteger;
43import java.net.ConnectException;
44import java.nio.ByteBuffer;
Carmelo Casconeb8a25052019-04-08 15:22:32 -070045import java.util.concurrent.ScheduledFuture;
46import java.util.concurrent.TimeUnit;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080047import java.util.concurrent.atomic.AtomicBoolean;
48
Carmelo Cascone3977ea42019-02-28 13:43:42 -080049import static com.google.common.base.Preconditions.checkArgument;
50import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080051import static java.lang.String.format;
Carmelo Casconeb8a25052019-04-08 15:22:32 -070052import static java.lang.System.currentTimeMillis;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080053import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
54import static org.slf4j.LoggerFactory.getLogger;
55
56/**
57 * Implementation of P4RuntimeStreamClient. Handles P4Runtime StreamChannel RPC
58 * operations, such as arbitration update and packet-in/out.
59 */
60public final class StreamClientImpl implements P4RuntimeStreamClient {
61
62 private static final Logger log = getLogger(StreamClientImpl.class);
63
Carmelo Casconeb8a25052019-04-08 15:22:32 -070064 private static final int ARBITRATION_RETRY_SECONDS = 3;
65 private static final int ARBITRATION_TIMEOUT_SECONDS = 15;
66
Carmelo Cascone4c289b72019-01-22 15:30:45 -080067 private final P4RuntimeClientImpl client;
68 private final DeviceId deviceId;
69 private final long p4DeviceId;
70 private final PiPipeconfService pipeconfService;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080071 private final MasterElectionIdStore masterElectionIdStore;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080072 private final P4RuntimeControllerImpl controller;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080073
Carmelo Cascone4c289b72019-01-22 15:30:45 -080074 private final StreamChannelManager streamChannelManager = new StreamChannelManager();
Carmelo Cascone3977ea42019-02-28 13:43:42 -080075 private final MasterElectionIdListener masterElectionIdListener = new InternalMasterElectionIdListener();
Carmelo Cascone4c289b72019-01-22 15:30:45 -080076
Carmelo Cascone3977ea42019-02-28 13:43:42 -080077 private final AtomicBoolean isMaster = new AtomicBoolean(false);
78 private final AtomicBoolean requestedToBeMaster = new AtomicBoolean(false);
Carmelo Cascone4c289b72019-01-22 15:30:45 -080079
Carmelo Cascone3977ea42019-02-28 13:43:42 -080080 private BigInteger pendingElectionId = null;
81 private BigInteger lastUsedElectionId = null;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080082
Carmelo Casconeb8a25052019-04-08 15:22:32 -070083 private ScheduledFuture<?> pendingElectionIdRetryTask = null;
84 private long pendingElectionIdTimestamp = 0;
85
Carmelo Cascone4c289b72019-01-22 15:30:45 -080086 StreamClientImpl(
87 PiPipeconfService pipeconfService,
Carmelo Cascone3977ea42019-02-28 13:43:42 -080088 MasterElectionIdStore masterElectionIdStore,
Carmelo Cascone4c289b72019-01-22 15:30:45 -080089 P4RuntimeClientImpl client,
90 P4RuntimeControllerImpl controller) {
91 this.client = client;
92 this.deviceId = client.deviceId();
93 this.p4DeviceId = client.p4DeviceId();
94 this.pipeconfService = pipeconfService;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080095 this.masterElectionIdStore = masterElectionIdStore;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080096 this.controller = controller;
97 }
98
99 @Override
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800100 public boolean isSessionOpen() {
101 return streamChannelManager.isOpen();
102 }
103
104 @Override
105 public void closeSession() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800106 synchronized (requestedToBeMaster) {
107 this.masterElectionIdStore.unsetListener(deviceId);
108 streamChannelManager.teardown();
109 pendingElectionId = null;
110 requestedToBeMaster.set(false);
111 isMaster.set(false);
112 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800113 }
114
115 @Override
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800116 public void setMastership(boolean master, BigInteger newElectionId) {
117 checkNotNull(newElectionId);
118 checkArgument(newElectionId.compareTo(BigInteger.ZERO) > 0,
119 "newElectionId must be a non zero positive number");
120 synchronized (requestedToBeMaster) {
121 requestedToBeMaster.set(master);
122 pendingElectionId = newElectionId;
123 handlePendingElectionId(masterElectionIdStore.get(deviceId));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800124 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800125 }
126
127 private void handlePendingElectionId(BigInteger masterElectionId) {
128 synchronized (requestedToBeMaster) {
129 if (pendingElectionId == null) {
130 // No pending requests.
131 return;
132 }
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700133 // Cancel any pending task. We'll reschedule if needed.
134 if (pendingElectionIdRetryTask != null) {
135 // Do not interrupt if running, as this might be executed by the
136 // pending task itself.
137 pendingElectionIdRetryTask.cancel(false);
138 pendingElectionIdRetryTask = null;
139 }
140 // We implement a deferring mechanism to avoid becoming master when
141 // we shouldn't, i.e. when the requested election ID is bigger than
142 // the master one on the device, but we don't want to be master.
143 // However, we make sure not to defer for more than
144 // ARBITRATION_TIMEOUT_SECONDS.
145 final boolean timeoutExpired;
146 if (pendingElectionIdTimestamp == 0) {
147 pendingElectionIdTimestamp = currentTimeMillis();
148 timeoutExpired = false;
149 } else {
150 timeoutExpired = (currentTimeMillis() - pendingElectionIdTimestamp)
151 > ARBITRATION_TIMEOUT_SECONDS * 1000;
152 }
153 if (timeoutExpired) {
154 log.warn("{} arbitration timeout expired! Will send pending election ID now...",
155 deviceId);
156 }
157 if (!timeoutExpired &&
158 !requestedToBeMaster.get() && masterElectionId != null &&
159 pendingElectionId.compareTo(masterElectionId) > 0) {
160 log.info("Deferring sending master arbitration update for {}, master " +
161 "election ID of server ({}) is smaller than " +
162 "requested one ({}), but we do NOT want to be master...",
163 deviceId, masterElectionId, pendingElectionId);
164 // Will try again as soon as the master election ID store is
165 // updated...
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800166 masterElectionIdStore.setListener(deviceId, masterElectionIdListener);
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700167 // ..or in ARBITRATION_RETRY_SECONDS at the latest (if we missed
168 // the store event).
169 pendingElectionIdRetryTask = SharedScheduledExecutors.newTimeout(
170 () -> handlePendingElectionId(masterElectionIdStore.get(deviceId)),
171 ARBITRATION_RETRY_SECONDS, TimeUnit.SECONDS);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800172 } else {
173 // Send now.
174 log.info("Setting mastership on {}... " +
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700175 "master={}, newElectionId={}, masterElectionId={}",
176 deviceId, requestedToBeMaster.get(),
177 pendingElectionId, masterElectionId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800178 sendMasterArbitrationUpdate(pendingElectionId);
179 pendingElectionId = null;
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700180 pendingElectionIdTimestamp = 0;
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800181 // No need to listen for master election ID changes.
182 masterElectionIdStore.unsetListener(deviceId);
183 }
184 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800185 }
186
187 @Override
188 public boolean isMaster() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800189 return isMaster.get();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800190 }
191
192 @Override
193 public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
194 if (!isSessionOpen()) {
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700195 log.warn("Dropping packet-out request for {}, session is closed",
196 deviceId);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800197 return;
198 }
199 if (log.isTraceEnabled()) {
200 log.trace("Sending packet-out to {}: {}", deviceId, packet);
201 }
202 try {
203 // Encode the PiPacketOperation into a PacketOut
204 final P4RuntimeOuterClass.PacketOut packetOut =
205 CODECS.packetOut().encode(packet, null, pipeconf);
206 // Build the request
207 final StreamMessageRequest packetOutRequest = StreamMessageRequest
208 .newBuilder().setPacket(packetOut).build();
209 // Send.
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800210 streamChannelManager.send(packetOutRequest);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800211 } catch (CodecException e) {
212 log.error("Unable to send packet-out: {}", e.getMessage());
213 }
214 }
215
216 private void sendMasterArbitrationUpdate(BigInteger electionId) {
217 log.debug("Sending arbitration update to {}... electionId={}",
218 deviceId, electionId);
219 final P4RuntimeOuterClass.Uint128 idMsg = bigIntegerToUint128(electionId);
220 streamChannelManager.send(
221 StreamMessageRequest.newBuilder()
222 .setArbitration(
223 P4RuntimeOuterClass.MasterArbitrationUpdate
224 .newBuilder()
225 .setDeviceId(p4DeviceId)
226 .setElectionId(idMsg)
227 .build())
228 .build());
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800229 lastUsedElectionId = electionId;
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800230 }
231
232 private P4RuntimeOuterClass.Uint128 bigIntegerToUint128(BigInteger value) {
233 final byte[] arr = value.toByteArray();
234 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
235 .put(new byte[Long.BYTES * 2 - arr.length])
236 .put(arr);
237 bb.rewind();
238 return P4RuntimeOuterClass.Uint128.newBuilder()
239 .setHigh(bb.getLong())
240 .setLow(bb.getLong())
241 .build();
242 }
243
244 private BigInteger uint128ToBigInteger(P4RuntimeOuterClass.Uint128 value) {
245 return new BigInteger(
246 ByteBuffer.allocate(Long.BYTES * 2)
247 .putLong(value.getHigh())
248 .putLong(value.getLow())
249 .array());
250 }
251
252 private void handlePacketIn(P4RuntimeOuterClass.PacketIn packetInMsg) {
253 if (log.isTraceEnabled()) {
254 log.trace("Received packet-in from {}: {}", deviceId, packetInMsg);
255 }
256 if (!pipeconfService.getPipeconf(deviceId).isPresent()) {
257 log.warn("Unable to handle packet-in from {}, missing pipeconf: {}",
258 deviceId, TextFormat.shortDebugString(packetInMsg));
259 return;
260 }
261 // Decode packet message and post event.
262 // TODO: consider implementing a cache to speed up
263 // encoding/deconding of packet-in/out (e.g. LLDP, ARP)
264 final PiPipeconf pipeconf = pipeconfService.getPipeconf(deviceId).get();
265 final PiPacketOperation pktOperation;
266 try {
267 pktOperation = CODECS.packetIn().decode(
268 packetInMsg, null, pipeconf);
269 } catch (CodecException e) {
270 log.warn("Unable to process packet-int: {}", e.getMessage());
271 return;
272 }
273 controller.postEvent(new P4RuntimeEvent(
274 P4RuntimeEvent.Type.PACKET_IN,
275 new PacketInEvent(deviceId, pktOperation)));
276 }
277
278 private void handleArbitrationUpdate(P4RuntimeOuterClass.MasterArbitrationUpdate msg) {
279 // From the spec...
280 // - Election_id: The stream RPC with the highest election_id is the
281 // master. Switch populates with the highest election ID it
282 // has received from all connected controllers.
283 // - Status: Switch populates this with OK for the client that is the
284 // master, and with an error status for all other connected clients (at
285 // every mastership change).
286 if (!msg.hasElectionId() || !msg.hasStatus()) {
287 return;
288 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800289 // Is this client master?
290 isMaster.set(msg.getStatus().getCode() == Status.OK.getCode().value());
291 // Notify new master election IDs to all nodes via distributed store.
292 // This is required for those nodes who do not have a Stream RPC open,
293 // and that otherwise would not be aware of changes, keeping their
294 // pending mastership operations forever.
295 final BigInteger masterElectionId = uint128ToBigInteger(msg.getElectionId());
296 masterElectionIdStore.set(deviceId, masterElectionId);
297
298 log.debug("Received arbitration update from {}: isMaster={}, masterElectionId={}",
299 deviceId, isMaster.get(), masterElectionId);
300
301 // Post mastership event via controller.
302 controller.postEvent(new DeviceAgentEvent(
303 isMaster.get() ? DeviceAgentEvent.Type.ROLE_MASTER
304 : DeviceAgentEvent.Type.ROLE_STANDBY, deviceId));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800305 }
306
307 /**
308 * Returns the election ID last used in a MasterArbitrationUpdate message
309 * sent by the client to the server.
310 *
311 * @return election ID uint128 protobuf message
312 */
313 P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800314 return lastUsedElectionId == null
315 ? P4RuntimeOuterClass.Uint128.getDefaultInstance()
316 : bigIntegerToUint128(lastUsedElectionId);
317 }
318
319 /**
320 * Handles updates of the master election ID by applying any pending
321 * mastership operation.
322 */
323 private class InternalMasterElectionIdListener
324 implements MasterElectionIdStore.MasterElectionIdListener {
325
326 @Override
327 public void updated(BigInteger masterElectionId) {
Carmelo Casconeb8a25052019-04-08 15:22:32 -0700328 log.debug("UPDATED master election ID of {}: {}",
329 deviceId, masterElectionId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800330 handlePendingElectionId(masterElectionId);
331 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800332 }
333
334 /**
335 * A manager for the P4Runtime stream channel that opportunistically creates
336 * new stream RCP stubs (e.g. when one fails because of errors) and posts
337 * channel events via the P4Runtime controller.
338 */
339 private final class StreamChannelManager {
340
341 private final AtomicBoolean open = new AtomicBoolean(false);
342 private final StreamObserver<StreamMessageResponse> responseObserver =
343 new InternalStreamResponseObserver(this);
344 private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
345
346 void send(StreamMessageRequest value) {
347 synchronized (this) {
348 initIfRequired();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800349 requestObserver.onNext(value);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800350 // Optimistically set the session as open. In case of errors, it
351 // will be closed by the response stream observer.
352 streamChannelManager.signalOpen();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800353 }
354 }
355
356 private void initIfRequired() {
357 if (requestObserver == null) {
358 log.debug("Creating new stream channel for {}...", deviceId);
359 open.set(false);
360 client.execRpcNoTimeout(
361 s -> requestObserver =
362 (ClientCallStreamObserver<StreamMessageRequest>)
363 s.streamChannel(responseObserver)
364 );
365 }
366 }
367
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800368 void teardown() {
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800369 synchronized (this) {
370 signalClosed();
371 if (requestObserver != null) {
372 requestObserver.onCompleted();
373 requestObserver.cancel("Completed", null);
374 requestObserver = null;
375 }
376 }
377 }
378
379 void signalOpen() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800380 open.set(true);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800381 }
382
383 void signalClosed() {
384 synchronized (this) {
385 final boolean wasOpen = open.getAndSet(false);
386 if (wasOpen) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800387 // We lost any valid mastership role.
388 controller.postEvent(new DeviceAgentEvent(
389 DeviceAgentEvent.Type.ROLE_NONE, deviceId));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800390 }
391 }
392 }
393
394 boolean isOpen() {
395 return open.get();
396 }
397 }
398
399 /**
400 * Handles messages received from the device on the stream channel.
401 */
402 private final class InternalStreamResponseObserver
403 implements StreamObserver<StreamMessageResponse> {
404
405 private final StreamChannelManager streamChannelManager;
406
407 private InternalStreamResponseObserver(
408 StreamChannelManager streamChannelManager) {
409 this.streamChannelManager = streamChannelManager;
410 }
411
412 @Override
413 public void onNext(StreamMessageResponse message) {
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800414 try {
415 if (log.isTraceEnabled()) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800416 log.trace("Received {} from {}: {}",
417 message.getUpdateCase(), deviceId,
418 TextFormat.shortDebugString(message));
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800419 }
420 switch (message.getUpdateCase()) {
421 case PACKET:
422 handlePacketIn(message.getPacket());
423 return;
424 case ARBITRATION:
425 handleArbitrationUpdate(message.getArbitration());
426 return;
427 default:
428 log.warn("Unrecognized StreamMessageResponse from {}: {}",
429 deviceId, message.getUpdateCase());
430 }
431 } catch (Throwable ex) {
432 log.error("Exception while processing stream message from {}",
433 deviceId, ex);
434 }
435 }
436
437 @Override
438 public void onError(Throwable throwable) {
439 if (throwable instanceof StatusRuntimeException) {
440 final StatusRuntimeException sre = (StatusRuntimeException) throwable;
441 if (sre.getStatus().getCause() instanceof ConnectException) {
442 log.warn("{} is unreachable ({})",
443 deviceId, sre.getCause().getMessage());
444 } else {
445 log.warn("Error on stream channel for {}: {}",
446 deviceId, throwable.getMessage());
447 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800448 log.debug("", throwable);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800449 } else {
450 log.error(format("Exception on stream channel for %s",
451 deviceId), throwable);
452 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800453 streamChannelManager.teardown();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800454 }
455
456 @Override
457 public void onCompleted() {
458 log.warn("Stream channel for {} has completed", deviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800459 streamChannelManager.teardown();
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800460 }
461 }
462}