Fixed gRPC exception because of channel not terminated
Change-Id: I7a138938a1daa50ab65b6622a372c0275a351535
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
index 5b6771c..4775db5 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
@@ -42,12 +42,16 @@
* <p>
* This method blocks until the channel is open or a timeout expires. By
* default the timeout is {@link #CONNECTION_TIMEOUT_SECONDS} seconds. If
- * the timeout expires, a IOException is thrown.
+ * the timeout expires, an {@link IOException} is thrown. If another channel
+ * with the same ID already exists, an {@link IllegalArgumentException} is
+ * thrown.
*
* @param channelId ID of the channel
* @param channelBuilder builder of the managed channel
* @return the managed channel created
- * @throws IOException if the channel cannot be opened
+ * @throws IOException if the channel cannot be opened
+ * @throws IllegalArgumentException if a channel with the same ID already
+ * exists
*/
ManagedChannel connectChannel(GrpcChannelId channelId,
ManagedChannelBuilder<?> channelBuilder)
@@ -55,7 +59,7 @@
/**
* Closes the gRPC managed channel (i.e., disconnects from the gRPC server)
- * and removed the channel from this controller.
+ * and removes any internal state associated to it.
*
* @param channelId ID of the channel to remove
*/
@@ -72,8 +76,7 @@
/**
* Returns true if the channel associated with the given identifier is open,
- * i.e. the server is able to successfully replies to RPCs, false
- * otherwise.
+ * i.e. the server is able to successfully reply to RPCs, false otherwise.
*
* @param channelId channel ID
* @return true if channel is open, false otherwise.
@@ -81,7 +84,7 @@
boolean isChannelOpen(GrpcChannelId channelId);
/**
- * Returns all channel associated to the given device ID.
+ * Returns all channels associated to the given device ID.
*
* @param deviceId device ID
* @return collection of channels
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index d65934a..7b9290e 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -60,6 +60,7 @@
import java.util.concurrent.locks.Lock;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG;
import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_DEFAULT;
@@ -73,17 +74,13 @@
public class GrpcChannelControllerImpl implements GrpcChannelController {
// FIXME: Should use message size to determine whether it needs to log the message or not.
- private static final String SET_FORWARDING_PIPELINE_CONFIG_METHOD = "p4.P4Runtime/SetForwardingPipelineConfig";
+ private static final String SET_FORWARDING_PIPELINE_CONFIG_METHOD = "p4.P4Runtime/SetForwardingPipelineConfig";
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService componentConfigService;
- // Hint: set to true to log all gRPC messages received/sent on all channels
- // Does not enable log on existing channels
- private static final boolean DEFAULT_LOG_LEVEL = false;
-
- /** Indicates whether to log all gRPC messages sent and received on all channels. */
- private static boolean enableMessageLog = DEFAULT_LOG_LEVEL;
+ /** Indicates whether to log gRPC messages. */
+ private static boolean enableMessageLog = ENABLE_MESSAGE_LOG_DEFAULT;
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -101,16 +98,17 @@
public void modified(ComponentContext context) {
if (context != null) {
Dictionary<?, ?> properties = context.getProperties();
- enableMessageLog = Tools.isPropertyEnabled(properties, ENABLE_MESSAGE_LOG,
- DEFAULT_LOG_LEVEL);
- log.info("Configured. Log of gRPC messages is {}", enableMessageLog ? "enabled" : "disabled");
+ enableMessageLog = Tools.isPropertyEnabled(
+ properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT);
+ log.info("Configured. Log of gRPC messages is {} for new channels",
+ enableMessageLog ? "enabled" : "disabled");
}
}
@Deactivate
public void deactivate() {
componentConfigService.unregisterProperties(getClass(), false);
- channels.values().forEach(ManagedChannel::shutdown);
+ channels.values().forEach(ManagedChannel::shutdownNow);
channels.clear();
log.info("Stopped");
}
@@ -126,11 +124,23 @@
lock.lock();
try {
- channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
+ if (channels.containsKey(channelId)) {
+ throw new IllegalArgumentException(format(
+ "A channel with ID '%s' already exists", channelId));
+ }
+ if (enableMessageLog) {
+ channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
+ }
ManagedChannel channel = channelBuilder.build();
- // Forced connection not yet implemented. Use workaround...
+ // Forced connection API is still experimental. Use workaround...
// channel.getState(true);
- doDummyMessage(channel);
+ try {
+ doDummyMessage(channel);
+ } catch (StatusRuntimeException e) {
+ channel.shutdownNow();
+ throw new IOException(e);
+ }
+ // If here, channel is open.
channels.put(channelId, channel);
return channel;
} finally {
@@ -138,18 +148,20 @@
}
}
- private void doDummyMessage(ManagedChannel channel) throws IOException {
+ private boolean doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
.newBlockingStub(channel)
.withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
try {
- dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
- .getDefaultInstance());
+ return dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
+ .getDefaultInstance()) != null;
} catch (StatusRuntimeException e) {
- if (e.getStatus() != Status.UNIMPLEMENTED) {
+ if (e.getStatus().equals(Status.UNIMPLEMENTED)) {
// UNIMPLEMENTED means that the server received our message but
// doesn't know how to handle it. Hence, channel is open.
- throw new IOException(e);
+ return true;
+ } else {
+ throw e;
}
}
}
@@ -163,16 +175,15 @@
try {
if (!channels.containsKey(channelId)) {
- log.warn("Can't check if channel open for unknown channel ID {}",
+ log.warn("Unknown channel ID '{}', can't check if channel is open",
channelId);
return false;
}
try {
- doDummyMessage(channels.get(channelId));
- return true;
- } catch (IOException e) {
+ return doDummyMessage(channels.get(channelId));
+ } catch (StatusRuntimeException e) {
log.debug("Unable to send dummy message to {}: {}",
- channelId, e.getCause().getMessage());
+ channelId, e.toString());
return false;
}
} finally {
@@ -186,23 +197,24 @@
Lock lock = channelLocks.get(channelId);
lock.lock();
-
try {
- if (!channels.containsKey(channelId)) {
+ final ManagedChannel channel = channels.remove(channelId);
+ if (channel == null) {
// Nothing to do.
return;
}
- ManagedChannel channel = channels.get(channelId);
try {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ if (!channel.shutdownNow()
+ .awaitTermination(5, TimeUnit.SECONDS)) {
+ log.error("Channel '{}' didn't terminate, although we " +
+ "triggered a shut down and waited",
+ channelId);
+ }
} catch (InterruptedException e) {
- log.warn("Channel {} didn't shut down in time.");
- channel.shutdownNow();
+ log.warn("Channel {} didn't shut down in time", channelId);
Thread.currentThread().interrupt();
}
-
- channels.remove(channelId);
} finally {
lock.unlock();
}
@@ -263,8 +275,8 @@
if (enableMessageLog && !methodDescriptor.getFullMethodName()
.startsWith(SET_FORWARDING_PIPELINE_CONFIG_METHOD)) {
log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}",
- channelId, methodDescriptor.getFullMethodName(),
- message.toString());
+ channelId, methodDescriptor.getFullMethodName(),
+ message.toString());
}
super.sendMessage(message);
}
@@ -282,8 +294,8 @@
public void onMessage(RespT message) {
if (enableMessageLog) {
log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}",
- channelId, methodDescriptor.getFullMethodName(),
- message.toString());
+ channelId, methodDescriptor.getFullMethodName(),
+ message.toString());
}
super.onMessage(message);
}