Allow sharing the same gRPC channel between clients
This change introduces a refactoring of the gRPC protocol subsystem that
allows the creation of a gRPC chanel independently of the client, while
allowing multiple clients to share the same channel (e.g. as in Stratum
where we use 3 clients).
Moreover, we refactor the P4RuntimeClient API to support multiple
P4Runtime-internal device ID using the same client. While before the
client was associated to one of such ID.
Finally, we provide an abstract implementation for gRPC-based driver
behaviors, reducing code duplication in P4Runtime, gNMI and gNOI drivers.
Change-Id: I1a46352bbbef1e0d24042f169ae8ba580202944f
diff --git a/protocols/grpc/ctl/BUILD b/protocols/grpc/ctl/BUILD
index a467398..fa10ff3 100644
--- a/protocols/grpc/ctl/BUILD
+++ b/protocols/grpc/ctl/BUILD
@@ -1,6 +1,5 @@
COMPILE_DEPS = CORE_DEPS + [
"//protocols/grpc/api:onos-protocols-grpc-api",
- "//protocols/grpc/proto:onos-protocols-grpc-proto",
"@io_grpc_grpc_java//core",
"@io_grpc_grpc_java//netty",
"@io_grpc_grpc_java//protobuf-lite",
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index befa334..933c7ce 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -21,7 +21,6 @@
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import org.onosproject.grpc.api.GrpcClient;
-import org.onosproject.grpc.api.GrpcClientKey;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceAgentEvent;
import org.slf4j.Logger;
@@ -49,25 +48,26 @@
private final AtomicBoolean channelOpen = new AtomicBoolean(false);
/**
- * Creates an new client for the given key and channel. Setting persistent
- * to true avoids the gRPC channel to stay IDLE. The controller instance is
- * needed to propagate channel events.
+ * Creates an new client for the given device and channel. Setting
+ * persistent to true avoids the gRPC channel to go {@link
+ * ConnectivityState#IDLE}. The controller instance is needed to propagate
+ * channel events.
*
- * @param clientKey client key
+ * @param deviceId device ID
* @param channel channel
* @param persistent true if the gRPC should never stay IDLE
* @param controller controller
*/
- protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel,
+ protected AbstractGrpcClient(DeviceId deviceId, ManagedChannel channel,
boolean persistent, AbstractGrpcClientController controller) {
- checkNotNull(clientKey);
+ checkNotNull(deviceId);
checkNotNull(channel);
- this.deviceId = clientKey.deviceId();
+ this.deviceId = deviceId;
this.channel = channel;
this.persistent = persistent;
this.controller = controller;
- setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
+ setChannelCallback(ConnectivityState.CONNECTING);
}
@Override
@@ -94,7 +94,7 @@
"ignoring request to shutdown for {}...", deviceId);
return;
}
- log.warn("Shutting down client for {}...", deviceId);
+ log.debug("Shutting down client for {}...", deviceId);
cancellableContext.cancel(new InterruptedException(
"Requested client shutdown"));
}
@@ -140,84 +140,70 @@
opDescription, deviceId), throwable);
}
- private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
- ConnectivityState sourceState) {
+ private void setChannelCallback(ConnectivityState sourceState) {
if (log.isTraceEnabled()) {
log.trace("Setting channel callback for {} with source state {}...",
deviceId, sourceState);
}
channel.notifyWhenStateChanged(
- sourceState, new ChannelConnectivityCallback(deviceId, channel));
+ sourceState, this::channelStateCallback);
}
/**
- * Runnable task invoked at each change of the channel connectivity state.
- * New callbacks are created as long as the channel is not shut down.
+ * Invoked at each change of the channel connectivity state. New callbacks
+ * are created as long as the channel is not shut down.
*/
- private final class ChannelConnectivityCallback implements Runnable {
-
- private final DeviceId deviceId;
- private final ManagedChannel channel;
-
- private ChannelConnectivityCallback(
- DeviceId deviceId, ManagedChannel channel) {
- this.deviceId = deviceId;
- this.channel = channel;
+ private void channelStateCallback() {
+ final ConnectivityState newState = channel.getState(false);
+ final DeviceAgentEvent.Type eventType;
+ switch (newState) {
+ // On gRPC connectivity states:
+ // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
+ case READY:
+ eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+ break;
+ case TRANSIENT_FAILURE:
+ eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
+ break;
+ case SHUTDOWN:
+ eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+ break;
+ case IDLE:
+ // IDLE and CONNECTING are transient states that will
+ // eventually move to READY or TRANSIENT_FAILURE. Do not
+ // generate an event for now.
+ if (persistent) {
+ log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
+ channel.getState(true);
+ }
+ eventType = null;
+ break;
+ case CONNECTING:
+ eventType = null;
+ break;
+ default:
+ log.error("Unrecognized connectivity state {}", newState);
+ eventType = null;
}
- @Override
- public void run() {
- final ConnectivityState newState = channel.getState(false);
- final DeviceAgentEvent.Type eventType;
- switch (newState) {
- // On gRPC connectivity states:
- // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
- case READY:
- eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
- break;
- case TRANSIENT_FAILURE:
- eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
- break;
- case SHUTDOWN:
- eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
- break;
- case IDLE:
- // IDLE and CONNECTING are transient states that will
- // eventually move to READY or TRANSIENT_FAILURE. Do not
- // generate an event for now.
- if (persistent) {
- log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
- channel.getState(true);
- }
- eventType = null;
- break;
- case CONNECTING:
- eventType = null;
- break;
- default:
- log.error("Unrecognized connectivity state {}", newState);
- eventType = null;
- }
+ if (log.isTraceEnabled()) {
+ log.trace("Detected channel connectivity change for {}, new state is {}",
+ deviceId, newState);
+ }
- if (log.isTraceEnabled()) {
- log.trace("Detected channel connectivity change for {}, new state is {}",
- deviceId, newState);
+ if (eventType != null) {
+ // Avoid sending consecutive duplicate events.
+ final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
+ final boolean past = channelOpen.getAndSet(present);
+ if (present != past) {
+ log.debug("Notifying event {} for {}", eventType, deviceId);
+ controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
}
+ }
- if (eventType != null) {
- // Avoid sending consecutive duplicate events.
- final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
- final boolean past = channelOpen.getAndSet(present);
- if (present != past) {
- log.debug("Notifying event {} for {}", eventType, deviceId);
- controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
- }
- }
-
- if (newState != ConnectivityState.SHUTDOWN) {
- // Channels never leave SHUTDOWN state, no need for a new callback.
- setChannelCallback(deviceId, channel, newState);
- }
+ if (newState != ConnectivityState.SHUTDOWN) {
+ // Channels never leave SHUTDOWN state, no need for a new callback.
+ setChannelCallback(newState);
}
}
}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
index 3c38209..9fb3625 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -19,29 +19,19 @@
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Striped;
import io.grpc.ManagedChannel;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NettyChannelBuilder;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.event.Event;
import org.onosproject.event.EventListener;
-import org.onosproject.grpc.api.GrpcChannelController;
-import org.onosproject.grpc.api.GrpcChannelId;
import org.onosproject.grpc.api.GrpcClient;
import org.onosproject.grpc.api.GrpcClientController;
-import org.onosproject.grpc.api.GrpcClientKey;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceAgentEvent;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.provider.ProviderId;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import javax.net.ssl.SSLException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
@@ -52,40 +42,35 @@
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Abstract class of a gRPC based client controller for specific gRPC client
- * which provides basic gRPC client management and thread safe mechanism.
+ * Abstract class of a controller for gRPC clients which provides means to
+ * create clients, associate device agent listeners to them and register other
+ * event listeners.
*
* @param <C> the gRPC client type
- * @param <K> the key type of the gRPC client
* @param <E> the event type of the gRPC client
* @param <L> the event listener of event {@link E}
*/
public abstract class AbstractGrpcClientController
- <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
+ <C extends GrpcClient, E extends Event, L extends EventListener<E>>
extends AbstractListenerManager<E, L>
- implements GrpcClientController<K, C> {
+ implements GrpcClientController<C> {
/**
* The default max inbound message size (MB).
*/
- private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
- private static final int MEGABYTES = 1024 * 1024;
private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
private final Logger log = getLogger(getClass());
- private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
- private final Map<K, C> clients = Maps.newHashMap();
- private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+ private final Map<DeviceId, C> clients = Maps.newHashMap();
private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
deviceAgentListeners = Maps.newConcurrentMap();
private final Class<E> eventClass;
+ private final String serviceName;
private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected GrpcChannelController grpcChannelController;
-
- public AbstractGrpcClientController(Class<E> eventClass) {
+ public AbstractGrpcClientController(Class<E> eventClass, String serviceName) {
this.eventClass = eventClass;
+ this.serviceName = serviceName;
}
@Activate
@@ -97,162 +82,67 @@
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(eventClass);
- clientKeys.keySet().forEach(this::removeClient);
- clientKeys.clear();
clients.clear();
- channelIds.clear();
deviceAgentListeners.clear();
log.info("Stopped");
}
@Override
- public boolean createClient(K clientKey) {
- checkNotNull(clientKey);
- return withDeviceLock(() -> doCreateClient(clientKey),
- clientKey.deviceId());
+ public boolean create(DeviceId deviceId, ManagedChannel channel) {
+ checkNotNull(deviceId);
+ checkNotNull(channel);
+ return withDeviceLock(() -> doCreateClient(deviceId, channel), deviceId);
}
- private boolean doCreateClient(K clientKey) {
- DeviceId deviceId = clientKey.deviceId();
+ private boolean doCreateClient(DeviceId deviceId, ManagedChannel channel) {
- if (clientKeys.containsKey(deviceId)) {
- final GrpcClientKey existingKey = clientKeys.get(deviceId);
- if (clientKey.equals(existingKey)) {
- log.debug("Not creating {} as it already exists... (key={})",
- clientName(clientKey), clientKey);
- return true;
- } else {
- throw new IllegalArgumentException(format(
- "A client already exists for device %s (%s)",
- deviceId, clientKey));
- }
- }
-
- final String method = clientKey.requiresSecureChannel()
- ? "TLS" : "plaintext TCP";
-
- log.info("Connecting {} client for {} to server at {} using {}...",
- clientKey.serviceName(), deviceId, clientKey.serveUri(), method);
-
- SslContext sslContext = null;
- if (clientKey.requiresSecureChannel()) {
- try {
- // FIXME: Accept any server certificate; this is insecure and
- // should not be used in production
- sslContext = GrpcSslContexts.forClient().trustManager(
- InsecureTrustManagerFactory.INSTANCE).build();
- } catch (SSLException e) {
- log.error("Failed to build SSL Context", e);
- return false;
- }
- }
-
- GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString());
- NettyChannelBuilder channelBuilder = NettyChannelBuilder
- .forAddress(clientKey.serveUri().getHost(),
- clientKey.serveUri().getPort())
- .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
-
- if (sslContext != null) {
- channelBuilder
- .sslContext(sslContext)
- .useTransportSecurity();
- } else {
- channelBuilder.usePlaintext();
- }
-
- final ManagedChannel channel;
-
- try {
- channel = grpcChannelController.connectChannel(channelId, channelBuilder);
- } catch (Throwable e) {
- log.warn("Failed to connect to {} ({}) using {}: {}",
- deviceId, clientKey.serveUri(), method, e.toString());
- log.debug("gRPC client connection exception", e);
- return false;
+ if (clients.containsKey(deviceId)) {
+ throw new IllegalArgumentException(format(
+ "A %s client already exists for %s", serviceName, deviceId));
}
final C client;
try {
- client = createClientInstance(clientKey, channel);
+ client = createClientInstance(deviceId, channel);
} catch (Throwable e) {
- log.error("Exception while creating {}", clientName(clientKey), e);
- grpcChannelController.disconnectChannel(channelId);
+ log.error("Exception while creating {}", clientName(deviceId), e);
return false;
}
if (client == null) {
- log.error("Unable to create {}, implementation returned null... (key={})",
- clientName(clientKey), clientKey);
- grpcChannelController.disconnectChannel(channelId);
+ log.error("Unable to create {}, implementation returned null...",
+ clientName(deviceId));
return false;
}
- clientKeys.put(deviceId, clientKey);
- clients.put(clientKey, client);
- channelIds.put(deviceId, channelId);
-
+ clients.put(deviceId, client);
return true;
}
- protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
+ protected abstract C createClientInstance(DeviceId deviceId, ManagedChannel channel);
@Override
- public C getClient(DeviceId deviceId) {
+ public C get(DeviceId deviceId) {
checkNotNull(deviceId);
- return withDeviceLock(() -> doGetClient(deviceId), deviceId);
+ return withDeviceLock(() -> clients.get(deviceId), deviceId);
}
- private C doGetClient(DeviceId deviceId) {
- if (!clientKeys.containsKey(deviceId)) {
+ @Override
+ public void remove(DeviceId deviceId) {
+ checkNotNull(deviceId);
+ withDeviceLock(() -> {
+ final C client = clients.remove(deviceId);
+ if (client != null) {
+ client.shutdown();
+ }
return null;
- }
- return clients.get(clientKeys.get(deviceId));
- }
-
- @Override
- public C getClient(K clientKey) {
- checkNotNull(clientKey);
- return clients.get(clientKey);
- }
-
- @Override
- public void removeClient(DeviceId deviceId) {
- checkNotNull(deviceId);
- withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
- }
-
- @Override
- public void removeClient(K clientKey) {
- checkNotNull(clientKey);
- withDeviceLock(() -> doRemoveClient(clientKey), clientKey.deviceId());
- }
-
- private Void doRemoveClient(DeviceId deviceId) {
- if (clientKeys.containsKey(deviceId)) {
- doRemoveClient(clientKeys.get(deviceId));
- }
- return null;
- }
-
- private Void doRemoveClient(K clientKey) {
- if (clients.containsKey(clientKey)) {
- clients.get(clientKey).shutdown();
- }
- if (channelIds.containsKey(clientKey.deviceId())) {
- grpcChannelController.disconnectChannel(
- channelIds.get(clientKey.deviceId()));
- }
- clientKeys.remove(clientKey.deviceId());
- clients.remove(clientKey);
- channelIds.remove(clientKey.deviceId());
- return null;
+ }, deviceId);
}
@Override
public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
checkNotNull(deviceId, "deviceId cannot be null");
- checkNotNull(deviceId, "providerId cannot be null");
+ checkNotNull(providerId, "providerId cannot be null");
checkNotNull(listener, "listener cannot be null");
deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
deviceAgentListeners.get(deviceId).put(providerId, listener);
@@ -277,7 +167,7 @@
// We should have only one event delivery mechanism. We have two now
// because we have two different types of events, DeviceAgentEvent and
// controller/protocol specific ones (e.g. P4Runtime or gNMI).
- // TODO: extend device agent event to allow delivery protocol-specific
+ // TODO: extend device agent event to allow delivery of protocol-specific
// events, e.g. packet-in
checkNotNull(event);
if (deviceAgentListeners.containsKey(event.subject())) {
@@ -296,7 +186,7 @@
}
}
- private String clientName(GrpcClientKey key) {
- return format("%s client for %s", key.serviceName(), key.deviceId());
+ private String clientName(DeviceId deviceId) {
+ return format("%s client for %s", serviceName, deviceId);
}
}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index 96a1671..3be7706 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -16,18 +16,16 @@
package org.onosproject.grpc.ctl;
-import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Striped;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.grpc.api.GrpcChannelController;
-import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.proto.dummy.Dummy;
-import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -38,16 +36,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLException;
+import java.net.URI;
import java.util.Dictionary;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG;
import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_DEFAULT;
@@ -61,6 +62,12 @@
})
public class GrpcChannelControllerImpl implements GrpcChannelController {
+ private static final String GRPC = "grpc";
+ private static final String GRPCS = "grpcs";
+
+ private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
+ private static final int MEGABYTES = 1024 * 1024;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService componentConfigService;
@@ -72,8 +79,8 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private Map<GrpcChannelId, ManagedChannel> channels;
- private Map<GrpcChannelId, GrpcLoggingInterceptor> interceptors;
+ private Map<URI, ManagedChannel> channels;
+ private Map<URI, GrpcLoggingInterceptor> interceptors;
private final Striped<Lock> channelLocks = Striped.lock(30);
@@ -109,129 +116,109 @@
}
@Override
- public ManagedChannel connectChannel(GrpcChannelId channelId,
- ManagedChannelBuilder<?> channelBuilder) {
- checkNotNull(channelId);
- checkNotNull(channelBuilder);
-
- Lock lock = channelLocks.get(channelId);
- lock.lock();
-
- try {
- if (channels.containsKey(channelId)) {
- throw new IllegalArgumentException(format(
- "A channel with ID '%s' already exists", channelId));
- }
-
- final GrpcLoggingInterceptor interceptor = new GrpcLoggingInterceptor(
- channelId, enableMessageLog);
- channelBuilder.intercept(interceptor);
-
- ManagedChannel channel = channelBuilder.build();
- // Forced connection API is still experimental. Use workaround...
- // channel.getState(true);
- try {
- doDummyMessage(channel);
- } catch (StatusRuntimeException e) {
- interceptor.close();
- shutdownNowAndWait(channel, channelId);
- throw e;
- }
- // If here, channel is open.
- channels.put(channelId, channel);
- interceptors.put(channelId, interceptor);
- return channel;
- } finally {
- lock.unlock();
- }
- }
-
- private void doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
- DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
- .newBlockingStub(channel)
- .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- try {
- //noinspection ResultOfMethodCallIgnored
- dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
- .getDefaultInstance());
- } catch (StatusRuntimeException e) {
- if (!e.getStatus().equals(Status.UNIMPLEMENTED)) {
- // UNIMPLEMENTED means that the server received our message but
- // doesn't know how to handle it. Hence, channel is open.
- throw e;
- }
- }
+ public ManagedChannel create(URI channelUri) {
+ return create(channelUri, makeChannelBuilder(channelUri));
}
@Override
- public void disconnectChannel(GrpcChannelId channelId) {
- checkNotNull(channelId);
+ public ManagedChannel create(URI channelUri, ManagedChannelBuilder<?> channelBuilder) {
+ checkNotNull(channelUri);
+ checkNotNull(channelBuilder);
- Lock lock = channelLocks.get(channelId);
- lock.lock();
+ channelLocks.get(channelUri).lock();
try {
- final ManagedChannel channel = channels.remove(channelId);
- if (channel != null) {
- shutdownNowAndWait(channel, channelId);
+ if (channels.containsKey(channelUri)) {
+ throw new IllegalArgumentException(format(
+ "A channel with ID '%s' already exists", channelUri));
}
- final GrpcLoggingInterceptor interceptor = interceptors.remove(channelId);
+
+ log.info("Creating new gRPC channel {}...", channelUri);
+
+ final GrpcLoggingInterceptor interceptor = new GrpcLoggingInterceptor(
+ channelUri, enableMessageLog);
+ channelBuilder.intercept(interceptor);
+
+ final ManagedChannel channel = channelBuilder.build();
+
+ channels.put(channelUri, channelBuilder.build());
+ interceptors.put(channelUri, interceptor);
+
+ return channel;
+ } finally {
+ channelLocks.get(channelUri).unlock();
+ }
+ }
+
+ private NettyChannelBuilder makeChannelBuilder(URI channelUri) {
+
+ checkArgument(channelUri.getScheme().equals(GRPC)
+ || channelUri.getScheme().equals(GRPCS),
+ format("Server URI scheme must be %s or %s", GRPC, GRPCS));
+ checkArgument(!isNullOrEmpty(channelUri.getHost()),
+ "Server host address should not be empty");
+ checkArgument(channelUri.getPort() > 0 && channelUri.getPort() <= 65535,
+ "Invalid server port");
+
+ final boolean useTls = channelUri.getScheme().equals(GRPCS);
+
+ final NettyChannelBuilder channelBuilder = NettyChannelBuilder
+ .forAddress(channelUri.getHost(),
+ channelUri.getPort())
+ .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
+
+ if (useTls) {
+ try {
+ // Accept any server certificate; this is insecure and
+ // should not be used in production.
+ final SslContext sslContext = GrpcSslContexts.forClient().trustManager(
+ InsecureTrustManagerFactory.INSTANCE).build();
+ channelBuilder.sslContext(sslContext).useTransportSecurity();
+ } catch (SSLException e) {
+ log.error("Failed to build SSL context", e);
+ return null;
+ }
+ } else {
+ channelBuilder.usePlaintext();
+ }
+
+ return channelBuilder;
+ }
+
+ @Override
+ public void destroy(URI channelUri) {
+ checkNotNull(channelUri);
+
+ channelLocks.get(channelUri).lock();
+ try {
+ final ManagedChannel channel = channels.remove(channelUri);
+ if (channel != null) {
+ shutdownNowAndWait(channel, channelUri);
+ }
+ final GrpcLoggingInterceptor interceptor = interceptors.remove(channelUri);
if (interceptor != null) {
interceptor.close();
}
} finally {
- lock.unlock();
+ channelLocks.get(channelUri).unlock();
}
}
- private void shutdownNowAndWait(ManagedChannel channel, GrpcChannelId channelId) {
+ private void shutdownNowAndWait(ManagedChannel channel, URI channelUri) {
try {
if (!channel.shutdownNow()
.awaitTermination(5, TimeUnit.SECONDS)) {
- log.error("Channel '{}' didn't terminate, although we " +
- "triggered a shutdown and waited",
- channelId);
+ log.error("Channel {} did not terminate properly",
+ channelUri);
}
} catch (InterruptedException e) {
- log.warn("Channel {} didn't shutdown in time", channelId);
+ log.warn("Channel {} didn't shutdown in time", channelUri);
Thread.currentThread().interrupt();
}
}
@Override
- public Map<GrpcChannelId, ManagedChannel> getChannels() {
- return ImmutableMap.copyOf(channels);
- }
-
- @Override
- public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
- checkNotNull(channelId);
-
- Lock lock = channelLocks.get(channelId);
- lock.lock();
- try {
- return Optional.ofNullable(channels.get(channelId));
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId) {
- final ManagedChannel channel = channels.get(channelId);
- if (channel == null) {
- log.warn("Unable to find any channel with ID {}, cannot send probe",
- channelId);
- return CompletableFuture.completedFuture(false);
- }
- return CompletableFuture.supplyAsync(() -> {
- try {
- doDummyMessage(channel);
- return true;
- } catch (StatusRuntimeException e) {
- log.debug("Probe for {} failed", channelId);
- log.debug("", e);
- return false;
- }
- });
+ public Optional<ManagedChannel> get(URI channelUri) {
+ checkNotNull(channelUri);
+ return Optional.ofNullable(channels.get(channelUri));
}
}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java
index 45ec5ba..6f06c11 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java
@@ -26,12 +26,12 @@
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.protobuf.lite.ProtoLiteUtils;
-import org.onosproject.grpc.api.GrpcChannelId;
import org.slf4j.Logger;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringJoiner;
@@ -55,13 +55,13 @@
private static final Logger log = getLogger(GrpcLoggingInterceptor.class);
private final AtomicLong callIdGenerator = new AtomicLong();
- private final GrpcChannelId channelId;
+ private final URI channelUri;
private final AtomicBoolean enabled;
private FileWriter writer;
- GrpcLoggingInterceptor(GrpcChannelId channelId, AtomicBoolean enabled) {
- this.channelId = channelId;
+ GrpcLoggingInterceptor(URI channelUri, AtomicBoolean enabled) {
+ this.channelUri = channelUri;
this.enabled = enabled;
}
@@ -69,14 +69,13 @@
if (writer != null) {
return true;
}
- final String safeChName = channelId.id()
- .replaceAll("[^A-Za-z0-9]", "_");
- final String fileName = format("grpc_%s_", safeChName).toLowerCase();
+ final String safeChName = channelUri.toString()
+ .replaceAll("[^A-Za-z0-9]", "_").toLowerCase();
try {
- final File tmpFile = File.createTempFile(fileName, ".log");
+ final File tmpFile = File.createTempFile(safeChName + "_", ".log");
this.writer = new FileWriter(tmpFile);
log.info("Created gRPC call log file for channel {}: {}",
- channelId, tmpFile.getAbsolutePath());
+ channelUri, tmpFile.getAbsolutePath());
return true;
} catch (IOException e) {
log.error("Unable to initialize gRPC call log writer", e);
@@ -90,7 +89,7 @@
return;
}
try {
- log.info("Closing log writer for {}...", channelId);
+ log.info("Closing log writer for {}...", channelUri);
writer.close();
} catch (IOException e) {
log.error("Unable to close gRPC call log writer", e);