Enhanced gRPC call logging
- Write calls to file, one file for each channel
- Log started, inbound msg, outbound msg, and closed events, for each RPC
- Distinguish between different RPCs by assigning an ID to each one
Also, removed redundant DeviceId attribute from GrpcChannelId, as all
channel IDs were already created using a client key that contains the
DeviceID. It seems a better approach to not restrict the definition of a
channel ID and have that defined simply as a string.
Change-Id: I9d88e528218a5689d6835c9b48022119976b6c5a
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index 4ad3166..ca0c8c3 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -17,18 +17,9 @@
package org.onosproject.grpc.ctl;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Striped;
-import io.grpc.CallOptions;
-import io.grpc.Channel;
-import io.grpc.ClientCall;
-import io.grpc.ClientInterceptor;
-import io.grpc.ForwardingClientCall;
-import io.grpc.ForwardingClientCallListener;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.onlab.util.Tools;
@@ -37,7 +28,6 @@
import org.onosproject.grpc.api.GrpcChannelId;
import org.onosproject.grpc.proto.dummy.Dummy;
import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
-import org.onosproject.net.DeviceId;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -48,14 +38,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
import java.util.Dictionary;
-import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -68,28 +56,31 @@
*/
@Component(immediate = true, service = GrpcChannelController.class,
property = {
- ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT,
+ ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT,
})
public class GrpcChannelControllerImpl implements GrpcChannelController {
- // FIXME: Should use message size to determine whether it needs to log the message or not.
- private static final String SET_FORWARDING_PIPELINE_CONFIG_METHOD = "p4.P4Runtime/SetForwardingPipelineConfig";
-
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService componentConfigService;
- /** Indicates whether to log gRPC messages. */
- private static boolean enableMessageLog = ENABLE_MESSAGE_LOG_DEFAULT;
+ /**
+ * Indicates whether to log gRPC messages.
+ */
+ private final AtomicBoolean enableMessageLog = new AtomicBoolean(
+ ENABLE_MESSAGE_LOG_DEFAULT);
private final Logger log = LoggerFactory.getLogger(getClass());
private Map<GrpcChannelId, ManagedChannel> channels;
+ private Map<GrpcChannelId, GrpcLoggingInterceptor> interceptors;
+
private final Striped<Lock> channelLocks = Striped.lock(30);
@Activate
public void activate() {
componentConfigService.registerProperties(getClass());
channels = new ConcurrentHashMap<>();
+ interceptors = new ConcurrentHashMap<>();
log.info("Started");
}
@@ -97,10 +88,12 @@
public void modified(ComponentContext context) {
if (context != null) {
Dictionary<?, ?> properties = context.getProperties();
- enableMessageLog = Tools.isPropertyEnabled(
- properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT);
- log.info("Configured. Log of gRPC messages is {} for new channels",
- enableMessageLog ? "enabled" : "disabled");
+ enableMessageLog.set(Tools.isPropertyEnabled(
+ properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT));
+ log.info("Configured. Logging of gRPC messages is {}",
+ enableMessageLog.get()
+ ? "ENABLED for new channels"
+ : "DISABLED for new and existing channels");
}
}
@@ -109,6 +102,10 @@
componentConfigService.unregisterProperties(getClass(), false);
channels.values().forEach(ManagedChannel::shutdownNow);
channels.clear();
+ channels = null;
+ interceptors.values().forEach(GrpcLoggingInterceptor::close);
+ interceptors.clear();
+ interceptors = null;
log.info("Stopped");
}
@@ -126,8 +123,11 @@
throw new IllegalArgumentException(format(
"A channel with ID '%s' already exists", channelId));
}
- if (enableMessageLog) {
- channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
+
+ GrpcLoggingInterceptor interceptor = null;
+ if (enableMessageLog.get()) {
+ interceptor = new GrpcLoggingInterceptor(channelId, enableMessageLog);
+ channelBuilder.intercept(interceptor);
}
ManagedChannel channel = channelBuilder.build();
// Forced connection API is still experimental. Use workaround...
@@ -135,11 +135,17 @@
try {
doDummyMessage(channel);
} catch (StatusRuntimeException e) {
+ if (interceptor != null) {
+ interceptor.close();
+ }
shutdownNowAndWait(channel, channelId);
throw e;
}
// If here, channel is open.
channels.put(channelId, channel);
+ if (interceptor != null) {
+ interceptors.put(channelId, interceptor);
+ }
return channel;
} finally {
lock.unlock();
@@ -200,6 +206,10 @@
if (channel != null) {
shutdownNowAndWait(channel, channelId);
}
+ final GrpcLoggingInterceptor interceptor = interceptors.remove(channelId);
+ if (interceptor != null) {
+ interceptor.close();
+ }
} finally {
lock.unlock();
}
@@ -225,19 +235,6 @@
}
@Override
- public Collection<ManagedChannel> getChannels(final DeviceId deviceId) {
- checkNotNull(deviceId);
- final Set<ManagedChannel> deviceChannels = new HashSet<>();
- channels.forEach((k, v) -> {
- if (k.deviceId().equals(deviceId)) {
- deviceChannels.add(v);
- }
- });
-
- return ImmutableSet.copyOf(deviceChannels);
- }
-
- @Override
public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
checkNotNull(channelId);
@@ -251,57 +248,4 @@
}
}
- /**
- * gRPC client interceptor that logs all messages sent and received.
- */
- private final class InternalLogChannelInterceptor implements ClientInterceptor {
-
- private final GrpcChannelId channelId;
-
- private InternalLogChannelInterceptor(GrpcChannelId channelId) {
- this.channelId = channelId;
- }
-
- @Override
- public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
- MethodDescriptor<ReqT, RespT> methodDescriptor,
- CallOptions callOptions, Channel channel) {
- return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
- channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) {
-
- @Override
- public void sendMessage(ReqT message) {
- if (enableMessageLog && !methodDescriptor.getFullMethodName()
- .startsWith(SET_FORWARDING_PIPELINE_CONFIG_METHOD)) {
- log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}",
- channelId, methodDescriptor.getFullMethodName(),
- message.toString());
- }
- super.sendMessage(message);
- }
-
- @Override
- public void start(Listener<RespT> responseListener, Metadata headers) {
-
- ClientCall.Listener<RespT> listener = new ForwardingClientCallListener<RespT>() {
- @Override
- protected Listener<RespT> delegate() {
- return responseListener;
- }
-
- @Override
- public void onMessage(RespT message) {
- if (enableMessageLog) {
- log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}",
- channelId, methodDescriptor.getFullMethodName(),
- message.toString());
- }
- super.onMessage(message);
- }
- };
- super.start(listener, headers);
- }
- };
- }
- }
}