Fixed gRPC exception because of channel not terminated

Change-Id: I7a138938a1daa50ab65b6622a372c0275a351535
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
index 5b6771c..4775db5 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
@@ -42,12 +42,16 @@
      * <p>
      * This method blocks until the channel is open or a timeout expires. By
      * default the timeout is {@link #CONNECTION_TIMEOUT_SECONDS} seconds. If
-     * the timeout expires, a IOException is thrown.
+     * the timeout expires, an {@link IOException} is thrown. If another channel
+     * with the same ID already exists, an {@link IllegalArgumentException} is
+     * thrown.
      *
      * @param channelId      ID of the channel
      * @param channelBuilder builder of the managed channel
      * @return the managed channel created
-     * @throws IOException if the channel cannot be opened
+     * @throws IOException              if the channel cannot be opened
+     * @throws IllegalArgumentException if a channel with the same ID already
+     *                                  exists
      */
     ManagedChannel connectChannel(GrpcChannelId channelId,
                                   ManagedChannelBuilder<?> channelBuilder)
@@ -55,7 +59,7 @@
 
     /**
      * Closes the gRPC managed channel (i.e., disconnects from the gRPC server)
-     * and removed the channel from this controller.
+     * and removes any internal state associated to it.
      *
      * @param channelId ID of the channel to remove
      */
@@ -72,8 +76,7 @@
 
     /**
      * Returns true if the channel associated with the given identifier is open,
-     * i.e. the server is able to successfully replies to RPCs, false
-     * otherwise.
+     * i.e. the server is able to successfully reply to RPCs, false otherwise.
      *
      * @param channelId channel ID
      * @return true if channel is open, false otherwise.
@@ -81,7 +84,7 @@
     boolean isChannelOpen(GrpcChannelId channelId);
 
     /**
-     * Returns all channel associated to the given device ID.
+     * Returns all channels associated to the given device ID.
      *
      * @param deviceId device ID
      * @return collection of channels
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index d65934a..7b9290e 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -60,6 +60,7 @@
 import java.util.concurrent.locks.Lock;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
 import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG;
 import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_DEFAULT;
 
@@ -73,17 +74,13 @@
 public class GrpcChannelControllerImpl implements GrpcChannelController {
 
     // FIXME: Should use message size to determine whether it needs to log the message or not.
-    private  static final String SET_FORWARDING_PIPELINE_CONFIG_METHOD = "p4.P4Runtime/SetForwardingPipelineConfig";
+    private static final String SET_FORWARDING_PIPELINE_CONFIG_METHOD = "p4.P4Runtime/SetForwardingPipelineConfig";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService componentConfigService;
 
-    // Hint: set to true to log all gRPC messages received/sent on all channels
-    // Does not enable log on existing channels
-    private static final boolean DEFAULT_LOG_LEVEL = false;
-
-    /** Indicates whether to log all gRPC messages sent and received on all channels. */
-    private static boolean enableMessageLog = DEFAULT_LOG_LEVEL;
+    /** Indicates whether to log gRPC messages. */
+    private static boolean enableMessageLog = ENABLE_MESSAGE_LOG_DEFAULT;
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -101,16 +98,17 @@
     public void modified(ComponentContext context) {
         if (context != null) {
             Dictionary<?, ?> properties = context.getProperties();
-            enableMessageLog = Tools.isPropertyEnabled(properties, ENABLE_MESSAGE_LOG,
-                    DEFAULT_LOG_LEVEL);
-            log.info("Configured. Log of gRPC messages is {}", enableMessageLog ? "enabled" : "disabled");
+            enableMessageLog = Tools.isPropertyEnabled(
+                    properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT);
+            log.info("Configured. Log of gRPC messages is {} for new channels",
+                     enableMessageLog ? "enabled" : "disabled");
         }
     }
 
     @Deactivate
     public void deactivate() {
         componentConfigService.unregisterProperties(getClass(), false);
-        channels.values().forEach(ManagedChannel::shutdown);
+        channels.values().forEach(ManagedChannel::shutdownNow);
         channels.clear();
         log.info("Stopped");
     }
@@ -126,11 +124,23 @@
         lock.lock();
 
         try {
-            channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
+            if (channels.containsKey(channelId)) {
+                throw new IllegalArgumentException(format(
+                        "A channel with ID '%s' already exists", channelId));
+            }
+            if (enableMessageLog) {
+                channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
+            }
             ManagedChannel channel = channelBuilder.build();
-            // Forced connection not yet implemented. Use workaround...
+            // Forced connection API is still experimental. Use workaround...
             // channel.getState(true);
-            doDummyMessage(channel);
+            try {
+                doDummyMessage(channel);
+            } catch (StatusRuntimeException e) {
+                channel.shutdownNow();
+                throw new IOException(e);
+            }
+            // If here, channel is open.
             channels.put(channelId, channel);
             return channel;
         } finally {
@@ -138,18 +148,20 @@
         }
     }
 
-    private void doDummyMessage(ManagedChannel channel) throws IOException {
+    private boolean doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
         DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
                 .newBlockingStub(channel)
                 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         try {
-            dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
-                                       .getDefaultInstance());
+            return dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
+                                              .getDefaultInstance()) != null;
         } catch (StatusRuntimeException e) {
-            if (e.getStatus() != Status.UNIMPLEMENTED) {
+            if (e.getStatus().equals(Status.UNIMPLEMENTED)) {
                 // UNIMPLEMENTED means that the server received our message but
                 // doesn't know how to handle it. Hence, channel is open.
-                throw new IOException(e);
+                return true;
+            } else {
+                throw e;
             }
         }
     }
@@ -163,16 +175,15 @@
 
         try {
             if (!channels.containsKey(channelId)) {
-                log.warn("Can't check if channel open for unknown channel ID {}",
+                log.warn("Unknown channel ID '{}', can't check if channel is open",
                          channelId);
                 return false;
             }
             try {
-                doDummyMessage(channels.get(channelId));
-                return true;
-            } catch (IOException e) {
+                return doDummyMessage(channels.get(channelId));
+            } catch (StatusRuntimeException e) {
                 log.debug("Unable to send dummy message to {}: {}",
-                          channelId, e.getCause().getMessage());
+                          channelId, e.toString());
                 return false;
             }
         } finally {
@@ -186,23 +197,24 @@
 
         Lock lock = channelLocks.get(channelId);
         lock.lock();
-
         try {
-            if (!channels.containsKey(channelId)) {
+            final ManagedChannel channel = channels.remove(channelId);
+            if (channel == null) {
                 // Nothing to do.
                 return;
             }
-            ManagedChannel channel = channels.get(channelId);
 
             try {
-                channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+                if (!channel.shutdownNow()
+                        .awaitTermination(5, TimeUnit.SECONDS)) {
+                    log.error("Channel '{}' didn't terminate, although we " +
+                                      "triggered a shut down and waited",
+                              channelId);
+                }
             } catch (InterruptedException e) {
-                log.warn("Channel {} didn't shut down in time.");
-                channel.shutdownNow();
+                log.warn("Channel {} didn't shut down in time", channelId);
                 Thread.currentThread().interrupt();
             }
-
-            channels.remove(channelId);
         } finally {
             lock.unlock();
         }
@@ -263,8 +275,8 @@
                     if (enableMessageLog && !methodDescriptor.getFullMethodName()
                             .startsWith(SET_FORWARDING_PIPELINE_CONFIG_METHOD)) {
                         log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}",
-                                channelId, methodDescriptor.getFullMethodName(),
-                                message.toString());
+                                 channelId, methodDescriptor.getFullMethodName(),
+                                 message.toString());
                     }
                     super.sendMessage(message);
                 }
@@ -282,8 +294,8 @@
                         public void onMessage(RespT message) {
                             if (enableMessageLog) {
                                 log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}",
-                                        channelId, methodDescriptor.getFullMethodName(),
-                                        message.toString());
+                                         channelId, methodDescriptor.getFullMethodName(),
+                                         message.toString());
                             }
                             super.onMessage(message);
                         }