ONOS-6680 Clean up implementation of gRPC controller

Change-Id: If84172d0a2dd64090557542af8ae12920260229f
diff --git a/protocols/grpc/BUCK b/protocols/grpc/BUCK
index 7aa98fc..e5e65e1 100644
--- a/protocols/grpc/BUCK
+++ b/protocols/grpc/BUCK
@@ -33,8 +33,6 @@
     title = 'gRPC Protocol Subsystem',
     category = 'Protocol',
     url = 'http://onosproject.org',
-    description = 'Exposes APIs to setup, manage and teardown gRPC Managed channels with devices. ' +
-        'Also offers channel observer registration and removal capabilities. ' +
-        'The exposed APis abstract low level channel operations.',
+    description = 'Exposes APIs to store and manage gRPC channels.',
     included_bundles = BUNDLES,
-)
\ No newline at end of file
+)
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java
index 9ce3e28..87609f4 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java
@@ -24,26 +24,18 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
- * gRPCChannel identifier suitable as an external key.
- * <p>
- * This class is immutable.</p>
+ * gRPC managed channel identifier, unique in the scope of a gRPC controller
+ * instance.
  */
 @Beta
 public final class GrpcChannelId extends Identifier<String> {
 
     private final DeviceId deviceId;
-
     private final String channelName;
 
-    /**
-     * Instantiates a new GrpcChannel id.
-     *
-     * @param deviceId    the device id
-     * @param channelName the name of the channel
-     */
     private GrpcChannelId(DeviceId deviceId, String channelName) {
         super(deviceId.toString() + ":" + channelName);
-        checkNotNull(deviceId, "device id must not be null");
+        checkNotNull(deviceId, "device ID must not be null");
         checkNotNull(channelName, "channel name must not be null");
         checkArgument(!channelName.isEmpty(), "channel name must not be empty");
         this.deviceId = deviceId;
@@ -51,31 +43,32 @@
     }
 
     /**
-     * Returns the deviceId of the device that uses this channel.
+     * Returns the device part of this channel ID.
      *
-     * @return the device Id
+     * @return device ID
      */
     public DeviceId deviceId() {
         return deviceId;
     }
 
     /**
-     * Returns the channel name.
+     * Returns the channel name part of this channel ID.
      *
-     * @return the channel name
+     * @return channel name
      */
     public String channelName() {
         return channelName;
     }
 
     /**
-     * Creates a grpc channel identifier from the specified device id and name provided.
+     * Instantiates a new channel ID for the given device ID and arbitrary
+     * channel name (e.g. the name of the gRPC service).
      *
-     * @param id          device id
+     * @param deviceId    device ID
      * @param channelName name of the channel
-     * @return channel name
+     * @return channel ID
      */
-    public static GrpcChannelId of(DeviceId id, String channelName) {
-        return new GrpcChannelId(id, channelName);
+    public static GrpcChannelId of(DeviceId deviceId, String channelName) {
+        return new GrpcChannelId(deviceId, channelName);
     }
 }
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
index 1ae15b9..33df22b 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
@@ -27,84 +27,72 @@
 import java.util.Optional;
 
 /**
- * Abstraction of a gRPC controller. Serves as a one stop shop for obtaining
- * gRPC ManagedChannels to interact with devices and (un)register observers on event streams from the device.
+ * Abstraction of a gRPC controller that stores and manages gRPC channels.
  */
 @Beta
 public interface GrpcController {
 
-    /**
-     * Adds a StreamObserver on a channel specific for a device.
-     *
-     * @param observerId          the id of the observer
-     * @param grpcObserverHandler the manager for the stream.
-     */
-    void addObserver(GrpcStreamObserverId observerId, GrpcObserverHandler grpcObserverHandler);
+    int CONNECTION_TIMEOUT_SECONDS = 20;
 
     /**
-     * Removes the StreamObserver on a channel specific for a device.
+     * Creates a gRPC managed channel from the given builder and opens a
+     * connection to it. If the connection is successful returns the managed
+     * channel object and stores the channel internally, associated with the
+     * given channel ID.
+     * <p>
+     * This method blocks until the channel is open or a timeout expires. By
+     * default the timeout is {@link #CONNECTION_TIMEOUT_SECONDS} seconds. If
+     * the timeout expires, a IOException is thrown.
      *
-     * @param observerId the id of the observer
+     * @param channelId      ID of the channel
+     * @param channelBuilder builder of the managed channel
+     * @return the managed channel created
+     * @throws IOException if the channel cannot be opened
      */
-    void removeObserver(GrpcStreamObserverId observerId);
+    ManagedChannel connectChannel(GrpcChannelId channelId,
+                                  ManagedChannelBuilder<?> channelBuilder)
+            throws IOException;
 
     /**
-     * If present returns the stream observer manager previously added for the given device.
+     * Closes the gRPC managed channel (i.e., disconnects from the gRPC server)
+     * and removed the channel from this controller.
      *
-     * @param observerId the id of the observer.
-     * @return the ObserverManager
-     */
-    Optional<GrpcObserverHandler> getObserverManager(GrpcStreamObserverId observerId);
-
-    /**
-     * Tries to connect to a specific gRPC server, if the connection is successful
-     * returns the ManagedChannel. This method blocks until the channel is setup or a timeout expires.
-     * By default the timeout is 20 seconds. If the timeout expires and thus the channel can't be set up
-     * a IOException is thrown.
-     *
-     * @param channelId      the id of the channel
-     * @param channelBuilder the builder for the managed channel.
-     * @return the ManagedChannel created.
-     * @throws IOException if channel can't be setup.
-     */
-    ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder) throws IOException;
-
-    /**
-     * Disconnects a gRPC device by removing it's ManagedChannel from this controller.
-     *
-     * @param channelId id of the service to remove
+     * @param channelId ID of the channel to remove
      */
     void disconnectChannel(GrpcChannelId channelId);
 
     /**
-     * Gets all ManagedChannels for the gRPC devices.
+     * Returns all channels known by this controller, each one mapped to the ID
+     * passed at creation time.
      *
-     * @return Map of all the ManagedChannels with their identifier saved in this controller
+     * @return map of all the channels with their ID as stored in this
+     * controller
      */
     Map<GrpcChannelId, ManagedChannel> getChannels();
 
     /**
-     * Returns true if the channel associated with the given identifier is open, i.e. the server is able to successfully
-     * responds to RPCs.
+     * Returns true if the channel associated with the given identifier is open,
+     * i.e. the server is able to successfully replies to RPCs, false
+     * otherwise.
      *
-     * @param channelId channel identifier
+     * @param channelId channel ID
      * @return true if channel is open, false otherwise.
      */
     boolean isChannelOpen(GrpcChannelId channelId);
 
     /**
-     * Returns all ManagedChannels associated to the given device identifier.
+     * Returns all channel associated to the given device ID.
      *
-     * @param deviceId the device for which we are interested.
-     * @return collection of all the ManagedChannels saved in this controller
+     * @param deviceId device ID
+     * @return collection of channels
      */
     Collection<ManagedChannel> getChannels(DeviceId deviceId);
 
     /**
-     * If present, returns the managed channel associated with the given identifier.
+     * If present, returns the channel associated with the given ID.
      *
-     * @param channelId the id of the channel
-     * @return the ManagedChannel of the device.
+     * @param channelId channel ID
+     * @return optional channel
      */
     Optional<ManagedChannel> getChannel(GrpcChannelId channelId);
 
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcObserverHandler.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcObserverHandler.java
deleted file mode 100644
index afe403a..0000000
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcObserverHandler.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.grpc.api;
-
-import com.google.common.annotations.Beta;
-import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-
-import java.util.Optional;
-
-/**
- * Implementation to add or remove an observer to the managed channel.
- *
- */
-@Beta
-public interface GrpcObserverHandler {
-    /**
-     * The implementation of this method adds an
-     * observer on a stub generated on the specific channel.
-     * This method will be called by the gRPC controller.
-     *
-     * @param channel the channel from which to derive the stub.
-     */
-    void bindObserver(ManagedChannel channel);
-
-    /**
-     * The implementation of this method returns the request stream
-     * observer, if any, on a stub generated on the specific channel.
-     *
-     * @return the observer on the stub, empty if observer is server-side unidirectional.
-     */
-    Optional<StreamObserver> requestStreamObserver();
-
-    /**
-     * The implementation of this method removes an
-     * observer on a stub generated on the specific channel.
-     * This method will be called by the gRPC controller.
-     */
-    void removeObserver();
-}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcServiceId.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcServiceId.java
deleted file mode 100644
index 4949ad8..0000000
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcServiceId.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.grpc.api;
-
-import com.google.common.annotations.Beta;
-import org.onlab.util.Identifier;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * gRPCService identifier suitable as an external key.
- * <p>
- * This class is immutable.</p>
- */
-@Beta
-public final class GrpcServiceId extends Identifier<String> {
-
-    private final GrpcChannelId channelId;
-
-    private final String serviceName;
-
-    /**
-     * Instantiates a new gRPC Service id.
-     *
-     * @param channelId   the channel id
-     * @param serviceName the name of the service on that channel
-     */
-    private GrpcServiceId(GrpcChannelId channelId, String serviceName) {
-        super(channelId.toString() + ":" + serviceName);
-        checkNotNull(channelId, "channel id must not be null");
-        checkNotNull(serviceName, "service name must not be null");
-        checkArgument(!serviceName.isEmpty(), "service name must not be empty");
-        this.channelId = channelId;
-        this.serviceName = serviceName;
-    }
-
-    /**
-     * Returns the id of the channel that this service uses.
-     *
-     * @return the channel Id
-     */
-    public GrpcChannelId channelId() {
-        return channelId;
-    }
-
-    /**
-     * Returns the name of this service.
-     *
-     * @return the service name
-     */
-    public String serviceName() {
-        return serviceName;
-    }
-
-    /**
-     * Creates a gRPC Service identifier from the specified device id and
-     * service name provided.
-     *
-     * @param id          channel id
-     * @param serviceName name of the service
-     * @return service name
-     */
-    public static GrpcServiceId of(GrpcChannelId id, String serviceName) {
-        return new GrpcServiceId(id, serviceName);
-    }
-}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcStreamObserverId.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcStreamObserverId.java
deleted file mode 100644
index a827d02..0000000
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcStreamObserverId.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.grpc.api;
-
-import com.google.common.annotations.Beta;
-import org.onlab.util.Identifier;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * GrpcStreamObserver identifier suitable as an external key.
- * <p>
- * This class is immutable.</p>
- */
-@Beta
-public final class GrpcStreamObserverId extends Identifier<String> {
-
-    private GrpcServiceId serviceId;
-    private String streamName;
-
-    /**
-     * Instantiates a new GrpcStreamObserver id.
-     *
-     * @param serviceId  the service id
-     * @param streamName the name of the stream on that device
-     */
-    private GrpcStreamObserverId(GrpcServiceId serviceId, String streamName) {
-        super(serviceId.toString() + ":" + streamName);
-        checkNotNull(serviceId, "service id must not be null");
-        checkNotNull(streamName, "stream name must not be null");
-        checkArgument(!streamName.isEmpty(), "stream name must not be empty");
-        this.serviceId = serviceId;
-        this.streamName = streamName;
-    }
-
-    /**
-     * Returns the id of the service that this stream observer uses.
-     *
-     * @return the service Id
-     */
-    public GrpcServiceId serviceId() {
-        return serviceId;
-    }
-
-    /**
-     * Returns the name of this stream.
-     *
-     * @return the stream name
-     */
-    public String streamName() {
-        return streamName;
-    }
-
-    /**
-     * Creates a gRPC Stream Observer identifier from the specified service id and
-     * stream name provided.
-     *
-     * @param id         service id
-     * @param streamName stream name
-     * @return stream name
-     */
-    public static GrpcStreamObserverId of(GrpcServiceId id, String streamName) {
-        return new GrpcStreamObserverId(id, streamName);
-    }
-}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
index 269936a..bbcd99f 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
@@ -37,8 +37,6 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.grpc.api.GrpcChannelId;
 import org.onosproject.grpc.api.GrpcController;
-import org.onosproject.grpc.api.GrpcObserverHandler;
-import org.onosproject.grpc.api.GrpcStreamObserverId;
 import org.onosproject.grpc.ctl.dummy.Dummy;
 import org.onosproject.grpc.ctl.dummy.DummyServiceGrpc;
 import org.onosproject.net.DeviceId;
@@ -69,52 +67,27 @@
     // TODO: make configurable at runtime
     public static boolean enableMessageLog = false;
 
-    private static final int CONNECTION_TIMEOUT_SECONDS = 20;
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
-    public static final Logger log = LoggerFactory
-            .getLogger(GrpcControllerImpl.class);
-
-    private Map<GrpcStreamObserverId, GrpcObserverHandler> observers;
     private Map<GrpcChannelId, ManagedChannel> channels;
-    private Map<GrpcChannelId, ManagedChannelBuilder<?>> channelBuilders;
     private final Map<GrpcChannelId, Lock> channelLocks = Maps.newConcurrentMap();
 
     @Activate
     public void activate() {
-        observers = new ConcurrentHashMap<>();
         channels = new ConcurrentHashMap<>();
-        channelBuilders = new ConcurrentHashMap<>();
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         channels.values().forEach(ManagedChannel::shutdown);
-        observers.clear();
         channels.clear();
-        channelBuilders.clear();
         log.info("Stopped");
     }
 
     @Override
-    public void addObserver(GrpcStreamObserverId observerId, GrpcObserverHandler grpcObserverHandler) {
-        grpcObserverHandler.bindObserver(channels.get(observerId.serviceId().channelId()));
-        observers.put(observerId, grpcObserverHandler);
-    }
-
-    @Override
-    public void removeObserver(GrpcStreamObserverId observerId) {
-        observers.get(observerId).removeObserver();
-        observers.remove(observerId);
-    }
-
-    @Override
-    public Optional<GrpcObserverHandler> getObserverManager(GrpcStreamObserverId observerId) {
-        return Optional.ofNullable(observers.get(observerId));
-    }
-
-    @Override
-    public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder)
+    public ManagedChannel connectChannel(GrpcChannelId channelId,
+                                         ManagedChannelBuilder<?> channelBuilder)
             throws IOException {
         checkNotNull(channelId);
         checkNotNull(channelBuilder);
@@ -130,7 +103,6 @@
             // Forced connection not yet implemented. Use workaround...
             // channel.getState(true);
             doDummyMessage(channel);
-            channelBuilders.put(channelId, channelBuilder);
             channels.put(channelId, channel);
             return channel;
         } finally {
@@ -139,14 +111,16 @@
     }
 
     private void doDummyMessage(ManagedChannel channel) throws IOException {
-        DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc.newBlockingStub(channel)
+        DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
+                .newBlockingStub(channel)
                 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         try {
-            dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse.getDefaultInstance());
+            dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
+                                       .getDefaultInstance());
         } catch (StatusRuntimeException e) {
             if (e.getStatus() != Status.UNIMPLEMENTED) {
-                // UNIMPLEMENTED means that server received our message but doesn't know how to handle it.
-                // Hence, channel is open.
+                // UNIMPLEMENTED means that the server received our message but
+                // doesn't know how to handle it. Hence, channel is open.
                 throw new IOException(e);
             }
         }
@@ -161,15 +135,16 @@
 
         try {
             if (!channels.containsKey(channelId)) {
-                log.warn("Can't check if channel open for unknown channel id {}", channelId);
+                log.warn("Can't check if channel open for unknown channel ID {}",
+                         channelId);
                 return false;
             }
             try {
                 doDummyMessage(channels.get(channelId));
                 return true;
             } catch (IOException e) {
-                log.warn("Error in sending dummy message to device {}", channelId);
-                log.debug("Exception ", e);
+                log.debug("Unable to send dummy message to {}: {}",
+                          channelId, e.getCause().getMessage());
                 return false;
             }
         } finally {
@@ -199,7 +174,6 @@
             }
 
             channels.remove(channelId);
-            channelBuilders.remove(channelId);
         } finally {
             lock.unlock();
         }
@@ -249,14 +223,16 @@
         }
 
         @Override
-        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
-                                                                   CallOptions callOptions, Channel channel) {
-            return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(
-                    methodDescriptor, callOptions.withoutWaitForReady())) {
+        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+                MethodDescriptor<ReqT, RespT> methodDescriptor,
+                CallOptions callOptions, Channel channel) {
+            return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
+                    channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) {
 
                 @Override
                 public void sendMessage(ReqT message) {
-                    log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}", channelId, methodDescriptor.getFullMethodName(),
+                    log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}",
+                             channelId, methodDescriptor.getFullMethodName(),
                              message.toString());
                     super.sendMessage(message);
                 }
@@ -272,8 +248,8 @@
 
                         @Override
                         public void onMessage(RespT message) {
-                            log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}", channelId,
-                                     methodDescriptor.getFullMethodName(),
+                            log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}",
+                                     channelId, methodDescriptor.getFullMethodName(),
                                      message.toString());
                             super.onMessage(message);
                         }