ONOS-6680 Clean up implementation of gRPC controller

Change-Id: If84172d0a2dd64090557542af8ae12920260229f
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
index 269936a..bbcd99f 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
@@ -37,8 +37,6 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.grpc.api.GrpcChannelId;
 import org.onosproject.grpc.api.GrpcController;
-import org.onosproject.grpc.api.GrpcObserverHandler;
-import org.onosproject.grpc.api.GrpcStreamObserverId;
 import org.onosproject.grpc.ctl.dummy.Dummy;
 import org.onosproject.grpc.ctl.dummy.DummyServiceGrpc;
 import org.onosproject.net.DeviceId;
@@ -69,52 +67,27 @@
     // TODO: make configurable at runtime
     public static boolean enableMessageLog = false;
 
-    private static final int CONNECTION_TIMEOUT_SECONDS = 20;
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
-    public static final Logger log = LoggerFactory
-            .getLogger(GrpcControllerImpl.class);
-
-    private Map<GrpcStreamObserverId, GrpcObserverHandler> observers;
     private Map<GrpcChannelId, ManagedChannel> channels;
-    private Map<GrpcChannelId, ManagedChannelBuilder<?>> channelBuilders;
     private final Map<GrpcChannelId, Lock> channelLocks = Maps.newConcurrentMap();
 
     @Activate
     public void activate() {
-        observers = new ConcurrentHashMap<>();
         channels = new ConcurrentHashMap<>();
-        channelBuilders = new ConcurrentHashMap<>();
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         channels.values().forEach(ManagedChannel::shutdown);
-        observers.clear();
         channels.clear();
-        channelBuilders.clear();
         log.info("Stopped");
     }
 
     @Override
-    public void addObserver(GrpcStreamObserverId observerId, GrpcObserverHandler grpcObserverHandler) {
-        grpcObserverHandler.bindObserver(channels.get(observerId.serviceId().channelId()));
-        observers.put(observerId, grpcObserverHandler);
-    }
-
-    @Override
-    public void removeObserver(GrpcStreamObserverId observerId) {
-        observers.get(observerId).removeObserver();
-        observers.remove(observerId);
-    }
-
-    @Override
-    public Optional<GrpcObserverHandler> getObserverManager(GrpcStreamObserverId observerId) {
-        return Optional.ofNullable(observers.get(observerId));
-    }
-
-    @Override
-    public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder)
+    public ManagedChannel connectChannel(GrpcChannelId channelId,
+                                         ManagedChannelBuilder<?> channelBuilder)
             throws IOException {
         checkNotNull(channelId);
         checkNotNull(channelBuilder);
@@ -130,7 +103,6 @@
             // Forced connection not yet implemented. Use workaround...
             // channel.getState(true);
             doDummyMessage(channel);
-            channelBuilders.put(channelId, channelBuilder);
             channels.put(channelId, channel);
             return channel;
         } finally {
@@ -139,14 +111,16 @@
     }
 
     private void doDummyMessage(ManagedChannel channel) throws IOException {
-        DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc.newBlockingStub(channel)
+        DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
+                .newBlockingStub(channel)
                 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         try {
-            dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse.getDefaultInstance());
+            dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
+                                       .getDefaultInstance());
         } catch (StatusRuntimeException e) {
             if (e.getStatus() != Status.UNIMPLEMENTED) {
-                // UNIMPLEMENTED means that server received our message but doesn't know how to handle it.
-                // Hence, channel is open.
+                // UNIMPLEMENTED means that the server received our message but
+                // doesn't know how to handle it. Hence, channel is open.
                 throw new IOException(e);
             }
         }
@@ -161,15 +135,16 @@
 
         try {
             if (!channels.containsKey(channelId)) {
-                log.warn("Can't check if channel open for unknown channel id {}", channelId);
+                log.warn("Can't check if channel open for unknown channel ID {}",
+                         channelId);
                 return false;
             }
             try {
                 doDummyMessage(channels.get(channelId));
                 return true;
             } catch (IOException e) {
-                log.warn("Error in sending dummy message to device {}", channelId);
-                log.debug("Exception ", e);
+                log.debug("Unable to send dummy message to {}: {}",
+                          channelId, e.getCause().getMessage());
                 return false;
             }
         } finally {
@@ -199,7 +174,6 @@
             }
 
             channels.remove(channelId);
-            channelBuilders.remove(channelId);
         } finally {
             lock.unlock();
         }
@@ -249,14 +223,16 @@
         }
 
         @Override
-        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
-                                                                   CallOptions callOptions, Channel channel) {
-            return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(
-                    methodDescriptor, callOptions.withoutWaitForReady())) {
+        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+                MethodDescriptor<ReqT, RespT> methodDescriptor,
+                CallOptions callOptions, Channel channel) {
+            return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
+                    channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) {
 
                 @Override
                 public void sendMessage(ReqT message) {
-                    log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}", channelId, methodDescriptor.getFullMethodName(),
+                    log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}",
+                             channelId, methodDescriptor.getFullMethodName(),
                              message.toString());
                     super.sendMessage(message);
                 }
@@ -272,8 +248,8 @@
 
                         @Override
                         public void onMessage(RespT message) {
-                            log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}", channelId,
-                                     methodDescriptor.getFullMethodName(),
+                            log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}",
+                                     channelId, methodDescriptor.getFullMethodName(),
                                      message.toString());
                             super.onMessage(message);
                         }