Handling of table entry messages in P4Runtime
+ synchonized method execution in P4RuntimeClient
+ support for cancellable contexts (for client shutdown)
+ logging of sent/received messages in GrpcControllerImpl
+ minor refactorings
Change-Id: I43f0fcc263579e01957a02ef3392105aed476f33
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
index 152106a..d3262f8 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
@@ -17,8 +17,16 @@
package org.onosproject.grpc.ctl;
import com.google.common.collect.ImmutableSet;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall;
+import io.grpc.ForwardingClientCallListener;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.apache.felix.scr.annotations.Activate;
@@ -51,6 +59,10 @@
@Service
public class GrpcControllerImpl implements GrpcController {
+ // Hint: set to true to log all gRPC messages received/sent on all channels.
+ // TODO: make configurable at runtime
+ public static boolean enableMessageLog = false;
+
private static final int CONNECTION_TIMEOUT_SECONDS = 20;
public static final Logger log = LoggerFactory
@@ -97,6 +109,11 @@
@Override
public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder)
throws IOException {
+
+ if (enableMessageLog) {
+ channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
+ }
+
ManagedChannel channel = channelBuilder.build();
// Forced connection not yet implemented. Use workaround...
@@ -177,4 +194,51 @@
public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
return Optional.ofNullable(channels.get(channelId));
}
+
+ /**
+ * gRPC client interceptor that logs all messages sent and received.
+ */
+ private final class InternalLogChannelInterceptor implements ClientInterceptor {
+
+ private final GrpcChannelId channelId;
+
+ private InternalLogChannelInterceptor(GrpcChannelId channelId) {
+ this.channelId = channelId;
+ }
+
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
+ CallOptions callOptions, Channel channel) {
+ return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(
+ methodDescriptor, callOptions.withoutWaitForReady())) {
+
+ @Override
+ public void sendMessage(ReqT message) {
+ log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}", channelId, methodDescriptor.getFullMethodName(),
+ message.toString());
+ super.sendMessage(message);
+ }
+
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata headers) {
+
+ ClientCall.Listener<RespT> listener = new ForwardingClientCallListener<RespT>() {
+ @Override
+ protected Listener<RespT> delegate() {
+ return responseListener;
+ }
+
+ @Override
+ public void onMessage(RespT message) {
+ log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}", channelId,
+ methodDescriptor.getFullMethodName(),
+ message.toString());
+ super.onMessage(message);
+ }
+ };
+ super.start(listener, headers);
+ }
+ };
+ }
+ }
}