Handling of table entry messages in P4Runtime

+ synchonized method execution in P4RuntimeClient
+ support for cancellable contexts (for client shutdown)
+ logging of sent/received messages in GrpcControllerImpl
+ minor refactorings

Change-Id: I43f0fcc263579e01957a02ef3392105aed476f33
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
index 152106a..d3262f8 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
@@ -17,8 +17,16 @@
 package org.onosproject.grpc.ctl;
 
 import com.google.common.collect.ImmutableSet;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall;
+import io.grpc.ForwardingClientCallListener;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
 import org.apache.felix.scr.annotations.Activate;
@@ -51,6 +59,10 @@
 @Service
 public class GrpcControllerImpl implements GrpcController {
 
+    // Hint: set to true to log all gRPC messages received/sent on all channels.
+    // TODO: make configurable at runtime
+    public static boolean enableMessageLog = false;
+
     private static final int CONNECTION_TIMEOUT_SECONDS = 20;
 
     public static final Logger log = LoggerFactory
@@ -97,6 +109,11 @@
     @Override
     public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder)
             throws IOException {
+
+        if (enableMessageLog) {
+            channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
+        }
+
         ManagedChannel channel = channelBuilder.build();
 
         // Forced connection not yet implemented. Use workaround...
@@ -177,4 +194,51 @@
     public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
         return Optional.ofNullable(channels.get(channelId));
     }
+
+    /**
+     * gRPC client interceptor that logs all messages sent and received.
+     */
+    private final class InternalLogChannelInterceptor implements ClientInterceptor {
+
+        private final GrpcChannelId channelId;
+
+        private InternalLogChannelInterceptor(GrpcChannelId channelId) {
+            this.channelId = channelId;
+        }
+
+        @Override
+        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
+                                                                   CallOptions callOptions, Channel channel) {
+            return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(
+                    methodDescriptor, callOptions.withoutWaitForReady())) {
+
+                @Override
+                public void sendMessage(ReqT message) {
+                    log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}", channelId, methodDescriptor.getFullMethodName(),
+                             message.toString());
+                    super.sendMessage(message);
+                }
+
+                @Override
+                public void start(Listener<RespT> responseListener, Metadata headers) {
+
+                    ClientCall.Listener<RespT> listener = new ForwardingClientCallListener<RespT>() {
+                        @Override
+                        protected Listener<RespT> delegate() {
+                            return responseListener;
+                        }
+
+                        @Override
+                        public void onMessage(RespT message) {
+                            log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}", channelId,
+                                     methodDescriptor.getFullMethodName(),
+                                     message.toString());
+                            super.onMessage(message);
+                        }
+                    };
+                    super.start(listener, headers);
+                }
+            };
+        }
+    }
 }