Enhanced gRPC call logging

- Write calls to file, one file for each channel
- Log started, inbound msg, outbound msg, and closed events, for each RPC
- Distinguish between different RPCs by assigning an ID to each one

Also, removed redundant DeviceId attribute from GrpcChannelId, as all
channel IDs were already created using a client key that contains the
DeviceID. It seems a better approach to not restrict the definition of a
channel ID and have that defined simply as a string.

Change-Id: I9d88e528218a5689d6835c9b48022119976b6c5a
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
index fdd5980..5b07254 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
@@ -20,9 +20,7 @@
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.StatusRuntimeException;
-import org.onosproject.net.DeviceId;
 
-import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 
@@ -83,14 +81,6 @@
     boolean isChannelOpen(GrpcChannelId channelId);
 
     /**
-     * Returns all channels associated to the given device ID.
-     *
-     * @param deviceId device ID
-     * @return collection of channels
-     */
-    Collection<ManagedChannel> getChannels(DeviceId deviceId);
-
-    /**
      * If present, returns the channel associated with the given ID.
      *
      * @param channelId channel ID
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java
index 87609f4..6db331a 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java
@@ -18,57 +18,24 @@
 
 import com.google.common.annotations.Beta;
 import org.onlab.util.Identifier;
-import org.onosproject.net.DeviceId;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
- * gRPC managed channel identifier, unique in the scope of a gRPC controller
- * instance.
+ * gRPC channel identifier, unique in the scope of an ONOS node.
  */
 @Beta
 public final class GrpcChannelId extends Identifier<String> {
 
-    private final DeviceId deviceId;
-    private final String channelName;
-
-    private GrpcChannelId(DeviceId deviceId, String channelName) {
-        super(deviceId.toString() + ":" + channelName);
-        checkNotNull(deviceId, "device ID must not be null");
-        checkNotNull(channelName, "channel name must not be null");
-        checkArgument(!channelName.isEmpty(), "channel name must not be empty");
-        this.deviceId = deviceId;
-        this.channelName = channelName;
+    private GrpcChannelId(String channelName) {
+        super(channelName);
     }
 
     /**
-     * Returns the device part of this channel ID.
+     * Instantiates a new channel ID.
      *
-     * @return device ID
-     */
-    public DeviceId deviceId() {
-        return deviceId;
-    }
-
-    /**
-     * Returns the channel name part of this channel ID.
-     *
-     * @return channel name
-     */
-    public String channelName() {
-        return channelName;
-    }
-
-    /**
-     * Instantiates a new channel ID for the given device ID and arbitrary
-     * channel name (e.g. the name of the gRPC service).
-     *
-     * @param deviceId    device ID
      * @param channelName name of the channel
      * @return channel ID
      */
-    public static GrpcChannelId of(DeviceId deviceId, String channelName) {
-        return new GrpcChannelId(deviceId, channelName);
+    public static GrpcChannelId of(String channelName) {
+        return new GrpcChannelId(channelName);
     }
 }
diff --git a/protocols/grpc/ctl/BUILD b/protocols/grpc/ctl/BUILD
index ac0703d..11f98c0 100644
--- a/protocols/grpc/ctl/BUILD
+++ b/protocols/grpc/ctl/BUILD
@@ -3,6 +3,9 @@
     "//protocols/grpc/proto:onos-protocols-grpc-proto",
     "@io_grpc_grpc_java//core",
     "@io_grpc_grpc_java//netty",
+    "@io_grpc_grpc_java//protobuf-lite",
+    "@com_google_protobuf//:protobuf_java",
+    "@com_google_api_grpc_proto_google_common_protos//jar",
 ]
 
 osgi_jar(
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
index e5f4884..dc7ae9d 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -115,7 +115,7 @@
 
         log.info("Creating client for {} (server={}:{})...",
                 deviceId, serverAddr, serverPort);
-        GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(), clientKey.toString());
+        GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString());
         ManagedChannelBuilder channelBuilder = NettyChannelBuilder
                 .forAddress(serverAddr, serverPort)
                 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES)
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index 4ad3166..ca0c8c3 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -17,18 +17,9 @@
 package org.onosproject.grpc.ctl;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.Striped;
-import io.grpc.CallOptions;
-import io.grpc.Channel;
-import io.grpc.ClientCall;
-import io.grpc.ClientInterceptor;
-import io.grpc.ForwardingClientCall;
-import io.grpc.ForwardingClientCallListener;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
 import org.onlab.util.Tools;
@@ -37,7 +28,6 @@
 import org.onosproject.grpc.api.GrpcChannelId;
 import org.onosproject.grpc.proto.dummy.Dummy;
 import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
-import org.onosproject.net.DeviceId;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -48,14 +38,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
 import java.util.Dictionary;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -68,28 +56,31 @@
  */
 @Component(immediate = true, service = GrpcChannelController.class,
         property = {
-            ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT,
+                ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT,
         })
 public class GrpcChannelControllerImpl implements GrpcChannelController {
 
-    // FIXME: Should use message size to determine whether it needs to log the message or not.
-    private static final String SET_FORWARDING_PIPELINE_CONFIG_METHOD = "p4.P4Runtime/SetForwardingPipelineConfig";
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService componentConfigService;
 
-    /** Indicates whether to log gRPC messages. */
-    private static boolean enableMessageLog = ENABLE_MESSAGE_LOG_DEFAULT;
+    /**
+     * Indicates whether to log gRPC messages.
+     */
+    private final AtomicBoolean enableMessageLog = new AtomicBoolean(
+            ENABLE_MESSAGE_LOG_DEFAULT);
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private Map<GrpcChannelId, ManagedChannel> channels;
+    private Map<GrpcChannelId, GrpcLoggingInterceptor> interceptors;
+
     private final Striped<Lock> channelLocks = Striped.lock(30);
 
     @Activate
     public void activate() {
         componentConfigService.registerProperties(getClass());
         channels = new ConcurrentHashMap<>();
+        interceptors = new ConcurrentHashMap<>();
         log.info("Started");
     }
 
@@ -97,10 +88,12 @@
     public void modified(ComponentContext context) {
         if (context != null) {
             Dictionary<?, ?> properties = context.getProperties();
-            enableMessageLog = Tools.isPropertyEnabled(
-                    properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT);
-            log.info("Configured. Log of gRPC messages is {} for new channels",
-                     enableMessageLog ? "enabled" : "disabled");
+            enableMessageLog.set(Tools.isPropertyEnabled(
+                    properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT));
+            log.info("Configured. Logging of gRPC messages is {}",
+                     enableMessageLog.get()
+                             ? "ENABLED for new channels"
+                             : "DISABLED for new and existing channels");
         }
     }
 
@@ -109,6 +102,10 @@
         componentConfigService.unregisterProperties(getClass(), false);
         channels.values().forEach(ManagedChannel::shutdownNow);
         channels.clear();
+        channels = null;
+        interceptors.values().forEach(GrpcLoggingInterceptor::close);
+        interceptors.clear();
+        interceptors = null;
         log.info("Stopped");
     }
 
@@ -126,8 +123,11 @@
                 throw new IllegalArgumentException(format(
                         "A channel with ID '%s' already exists", channelId));
             }
-            if (enableMessageLog) {
-                channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
+
+            GrpcLoggingInterceptor interceptor = null;
+            if (enableMessageLog.get()) {
+                interceptor = new GrpcLoggingInterceptor(channelId, enableMessageLog);
+                channelBuilder.intercept(interceptor);
             }
             ManagedChannel channel = channelBuilder.build();
             // Forced connection API is still experimental. Use workaround...
@@ -135,11 +135,17 @@
             try {
                 doDummyMessage(channel);
             } catch (StatusRuntimeException e) {
+                if (interceptor != null) {
+                    interceptor.close();
+                }
                 shutdownNowAndWait(channel, channelId);
                 throw e;
             }
             // If here, channel is open.
             channels.put(channelId, channel);
+            if (interceptor != null) {
+                interceptors.put(channelId, interceptor);
+            }
             return channel;
         } finally {
             lock.unlock();
@@ -200,6 +206,10 @@
             if (channel != null) {
                 shutdownNowAndWait(channel, channelId);
             }
+            final GrpcLoggingInterceptor interceptor = interceptors.remove(channelId);
+            if (interceptor != null) {
+                interceptor.close();
+            }
         } finally {
             lock.unlock();
         }
@@ -225,19 +235,6 @@
     }
 
     @Override
-    public Collection<ManagedChannel> getChannels(final DeviceId deviceId) {
-        checkNotNull(deviceId);
-        final Set<ManagedChannel> deviceChannels = new HashSet<>();
-        channels.forEach((k, v) -> {
-            if (k.deviceId().equals(deviceId)) {
-                deviceChannels.add(v);
-            }
-        });
-
-        return ImmutableSet.copyOf(deviceChannels);
-    }
-
-    @Override
     public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
         checkNotNull(channelId);
 
@@ -251,57 +248,4 @@
         }
     }
 
-    /**
-     * gRPC client interceptor that logs all messages sent and received.
-     */
-    private final class InternalLogChannelInterceptor implements ClientInterceptor {
-
-        private final GrpcChannelId channelId;
-
-        private InternalLogChannelInterceptor(GrpcChannelId channelId) {
-            this.channelId = channelId;
-        }
-
-        @Override
-        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
-                MethodDescriptor<ReqT, RespT> methodDescriptor,
-                CallOptions callOptions, Channel channel) {
-            return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
-                    channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) {
-
-                @Override
-                public void sendMessage(ReqT message) {
-                    if (enableMessageLog && !methodDescriptor.getFullMethodName()
-                            .startsWith(SET_FORWARDING_PIPELINE_CONFIG_METHOD)) {
-                        log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}",
-                                 channelId, methodDescriptor.getFullMethodName(),
-                                 message.toString());
-                    }
-                    super.sendMessage(message);
-                }
-
-                @Override
-                public void start(Listener<RespT> responseListener, Metadata headers) {
-
-                    ClientCall.Listener<RespT> listener = new ForwardingClientCallListener<RespT>() {
-                        @Override
-                        protected Listener<RespT> delegate() {
-                            return responseListener;
-                        }
-
-                        @Override
-                        public void onMessage(RespT message) {
-                            if (enableMessageLog) {
-                                log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}",
-                                         channelId, methodDescriptor.getFullMethodName(),
-                                         message.toString());
-                            }
-                            super.onMessage(message);
-                        }
-                    };
-                    super.start(listener, headers);
-                }
-            };
-        }
-    }
 }
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java
new file mode 100644
index 0000000..774152e
--- /dev/null
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.ctl;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall;
+import io.grpc.ForwardingClientCallListener;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.protobuf.lite.ProtoLiteUtils;
+import org.onosproject.grpc.api.GrpcChannelId;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.StringJoiner;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.lang.String.format;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * gRPC client interceptor that logs to file all messages sent and received.
+ */
+final class GrpcLoggingInterceptor implements ClientInterceptor {
+
+    private static final Metadata.Key<com.google.rpc.Status> GRPC_STATUS_KEY =
+            Metadata.Key.of(
+                    "grpc-status-details-bin",
+                    ProtoLiteUtils.metadataMarshaller(
+                            com.google.rpc.Status.getDefaultInstance()));
+
+    private static final Logger log = getLogger(GrpcLoggingInterceptor.class);
+
+    private final AtomicLong callIdGenerator = new AtomicLong();
+    private final GrpcChannelId channelId;
+    private final AtomicBoolean enabled;
+
+    private FileWriter writer;
+
+    GrpcLoggingInterceptor(GrpcChannelId channelId, AtomicBoolean enabled) {
+        this.channelId = channelId;
+        this.enabled = enabled;
+        try {
+            writer = initWriter();
+            write(format("GRPC CALL LOG - %s\n\n", channelId));
+        } catch (IOException e) {
+            log.error("Unable to initialize gRPC call log writer", e);
+        }
+    }
+
+    private FileWriter initWriter() throws IOException {
+        final String safeChName = channelId.id()
+                .replaceAll("[^A-Za-z0-9]", "_");
+        final String fileName = format("grpc_%s_", safeChName).toLowerCase();
+        final File tmpFile = File.createTempFile(fileName, ".log");
+        log.info("Created gRPC call log file for channel {}: {}",
+                 channelId, tmpFile.getAbsolutePath());
+        return new FileWriter(tmpFile);
+    }
+
+    void close() {
+        synchronized (this) {
+            if (writer == null) {
+                return;
+            }
+            try {
+                log.info("Closing log writer for {}...", channelId);
+                writer.close();
+            } catch (IOException e) {
+                log.error("Unable to close gRPC call log writer", e);
+            }
+            writer = null;
+        }
+    }
+
+    private void write(String message) {
+        synchronized (this) {
+            if (writer != null) {
+                if (message.length() > 4096) {
+                    message = message.substring(0, 256) + "... TRUNCATED!\n\n";
+                }
+                try {
+                    writer.write(format(
+                            "*** %s - %s",
+                            new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S")
+                                    .format(new Date()),
+                            message));
+                } catch (IOException e) {
+                    log.error("Unable to write gRPC call log", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+            MethodDescriptor<ReqT, RespT> methodDescriptor,
+            CallOptions callOptions, Channel channel) {
+        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
+                channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) {
+
+            private final long callId = callIdGenerator.getAndIncrement();
+
+            @Override
+            public void sendMessage(ReqT message) {
+                if (enabled.get()) {
+                    write(format(
+                            "%s >> OUTBOUND >> [callId=%s]\n%s\n",
+                            methodDescriptor.getFullMethodName(),
+                            callId,
+                            message.toString()));
+                }
+                super.sendMessage(message);
+            }
+
+            @Override
+            public void start(Listener<RespT> responseListener, Metadata headers) {
+
+                if (enabled.get()) {
+                    write(format(
+                            "%s STARTED [callId=%s]\n%s\n\n",
+                            methodDescriptor.getFullMethodName(),
+                            callId,
+                            headers.toString()));
+                }
+
+                Listener<RespT> listener = new ForwardingClientCallListener<RespT>() {
+                    @Override
+                    protected Listener<RespT> delegate() {
+                        return responseListener;
+                    }
+
+                    @Override
+                    public void onMessage(RespT message) {
+                        if (enabled.get()) {
+                            write(format(
+                                    "%s << INBOUND << [callId=%s]\n%s\n",
+                                    methodDescriptor.getFullMethodName(),
+                                    callId,
+                                    message.toString()));
+                        }
+                        super.onMessage(message);
+                    }
+
+                    @Override
+                    public void onClose(Status status, Metadata trailers) {
+                        if (enabled.get()) {
+                            write(format(
+                                    "%s CLOSED [callId=%s]\n%s\n%s\n\n",
+                                    methodDescriptor.getFullMethodName(),
+                                    callId,
+                                    status.toString(),
+                                    parseTrailers(trailers)));
+                        }
+                        super.onClose(status, trailers);
+                    }
+
+                    private String parseTrailers(Metadata trailers) {
+                        StringJoiner joiner = new StringJoiner("\n");
+                        joiner.add(trailers.toString());
+                        // If Google's RPC Status trailers are found, parse them.
+                        final Iterable<com.google.rpc.Status> statuses = trailers.getAll(
+                                GRPC_STATUS_KEY);
+                        if (statuses == null) {
+                            return joiner.toString();
+                        }
+                        statuses.forEach(s -> joiner.add(s.toString()));
+                        return joiner.toString();
+                    }
+
+                    @Override
+                    public void onHeaders(Metadata headers) {
+                        super.onHeaders(headers);
+                    }
+                };
+
+                super.start(listener, headers);
+            }
+        };
+    }
+}