ONOS-6680 Clean up implementation of gRPC controller
Change-Id: If84172d0a2dd64090557542af8ae12920260229f
diff --git a/protocols/grpc/BUCK b/protocols/grpc/BUCK
index 7aa98fc..e5e65e1 100644
--- a/protocols/grpc/BUCK
+++ b/protocols/grpc/BUCK
@@ -33,8 +33,6 @@
title = 'gRPC Protocol Subsystem',
category = 'Protocol',
url = 'http://onosproject.org',
- description = 'Exposes APIs to setup, manage and teardown gRPC Managed channels with devices. ' +
- 'Also offers channel observer registration and removal capabilities. ' +
- 'The exposed APis abstract low level channel operations.',
+ description = 'Exposes APIs to store and manage gRPC channels.',
included_bundles = BUNDLES,
-)
\ No newline at end of file
+)
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java
index 9ce3e28..87609f4 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java
@@ -24,26 +24,18 @@
import static com.google.common.base.Preconditions.checkNotNull;
/**
- * gRPCChannel identifier suitable as an external key.
- * <p>
- * This class is immutable.</p>
+ * gRPC managed channel identifier, unique in the scope of a gRPC controller
+ * instance.
*/
@Beta
public final class GrpcChannelId extends Identifier<String> {
private final DeviceId deviceId;
-
private final String channelName;
- /**
- * Instantiates a new GrpcChannel id.
- *
- * @param deviceId the device id
- * @param channelName the name of the channel
- */
private GrpcChannelId(DeviceId deviceId, String channelName) {
super(deviceId.toString() + ":" + channelName);
- checkNotNull(deviceId, "device id must not be null");
+ checkNotNull(deviceId, "device ID must not be null");
checkNotNull(channelName, "channel name must not be null");
checkArgument(!channelName.isEmpty(), "channel name must not be empty");
this.deviceId = deviceId;
@@ -51,31 +43,32 @@
}
/**
- * Returns the deviceId of the device that uses this channel.
+ * Returns the device part of this channel ID.
*
- * @return the device Id
+ * @return device ID
*/
public DeviceId deviceId() {
return deviceId;
}
/**
- * Returns the channel name.
+ * Returns the channel name part of this channel ID.
*
- * @return the channel name
+ * @return channel name
*/
public String channelName() {
return channelName;
}
/**
- * Creates a grpc channel identifier from the specified device id and name provided.
+ * Instantiates a new channel ID for the given device ID and arbitrary
+ * channel name (e.g. the name of the gRPC service).
*
- * @param id device id
+ * @param deviceId device ID
* @param channelName name of the channel
- * @return channel name
+ * @return channel ID
*/
- public static GrpcChannelId of(DeviceId id, String channelName) {
- return new GrpcChannelId(id, channelName);
+ public static GrpcChannelId of(DeviceId deviceId, String channelName) {
+ return new GrpcChannelId(deviceId, channelName);
}
}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
index 1ae15b9..33df22b 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
@@ -27,84 +27,72 @@
import java.util.Optional;
/**
- * Abstraction of a gRPC controller. Serves as a one stop shop for obtaining
- * gRPC ManagedChannels to interact with devices and (un)register observers on event streams from the device.
+ * Abstraction of a gRPC controller that stores and manages gRPC channels.
*/
@Beta
public interface GrpcController {
- /**
- * Adds a StreamObserver on a channel specific for a device.
- *
- * @param observerId the id of the observer
- * @param grpcObserverHandler the manager for the stream.
- */
- void addObserver(GrpcStreamObserverId observerId, GrpcObserverHandler grpcObserverHandler);
+ int CONNECTION_TIMEOUT_SECONDS = 20;
/**
- * Removes the StreamObserver on a channel specific for a device.
+ * Creates a gRPC managed channel from the given builder and opens a
+ * connection to it. If the connection is successful returns the managed
+ * channel object and stores the channel internally, associated with the
+ * given channel ID.
+ * <p>
+ * This method blocks until the channel is open or a timeout expires. By
+ * default the timeout is {@link #CONNECTION_TIMEOUT_SECONDS} seconds. If
+ * the timeout expires, a IOException is thrown.
*
- * @param observerId the id of the observer
+ * @param channelId ID of the channel
+ * @param channelBuilder builder of the managed channel
+ * @return the managed channel created
+ * @throws IOException if the channel cannot be opened
*/
- void removeObserver(GrpcStreamObserverId observerId);
+ ManagedChannel connectChannel(GrpcChannelId channelId,
+ ManagedChannelBuilder<?> channelBuilder)
+ throws IOException;
/**
- * If present returns the stream observer manager previously added for the given device.
+ * Closes the gRPC managed channel (i.e., disconnects from the gRPC server)
+ * and removed the channel from this controller.
*
- * @param observerId the id of the observer.
- * @return the ObserverManager
- */
- Optional<GrpcObserverHandler> getObserverManager(GrpcStreamObserverId observerId);
-
- /**
- * Tries to connect to a specific gRPC server, if the connection is successful
- * returns the ManagedChannel. This method blocks until the channel is setup or a timeout expires.
- * By default the timeout is 20 seconds. If the timeout expires and thus the channel can't be set up
- * a IOException is thrown.
- *
- * @param channelId the id of the channel
- * @param channelBuilder the builder for the managed channel.
- * @return the ManagedChannel created.
- * @throws IOException if channel can't be setup.
- */
- ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder) throws IOException;
-
- /**
- * Disconnects a gRPC device by removing it's ManagedChannel from this controller.
- *
- * @param channelId id of the service to remove
+ * @param channelId ID of the channel to remove
*/
void disconnectChannel(GrpcChannelId channelId);
/**
- * Gets all ManagedChannels for the gRPC devices.
+ * Returns all channels known by this controller, each one mapped to the ID
+ * passed at creation time.
*
- * @return Map of all the ManagedChannels with their identifier saved in this controller
+ * @return map of all the channels with their ID as stored in this
+ * controller
*/
Map<GrpcChannelId, ManagedChannel> getChannels();
/**
- * Returns true if the channel associated with the given identifier is open, i.e. the server is able to successfully
- * responds to RPCs.
+ * Returns true if the channel associated with the given identifier is open,
+ * i.e. the server is able to successfully replies to RPCs, false
+ * otherwise.
*
- * @param channelId channel identifier
+ * @param channelId channel ID
* @return true if channel is open, false otherwise.
*/
boolean isChannelOpen(GrpcChannelId channelId);
/**
- * Returns all ManagedChannels associated to the given device identifier.
+ * Returns all channel associated to the given device ID.
*
- * @param deviceId the device for which we are interested.
- * @return collection of all the ManagedChannels saved in this controller
+ * @param deviceId device ID
+ * @return collection of channels
*/
Collection<ManagedChannel> getChannels(DeviceId deviceId);
/**
- * If present, returns the managed channel associated with the given identifier.
+ * If present, returns the channel associated with the given ID.
*
- * @param channelId the id of the channel
- * @return the ManagedChannel of the device.
+ * @param channelId channel ID
+ * @return optional channel
*/
Optional<ManagedChannel> getChannel(GrpcChannelId channelId);
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcObserverHandler.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcObserverHandler.java
deleted file mode 100644
index afe403a..0000000
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcObserverHandler.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.grpc.api;
-
-import com.google.common.annotations.Beta;
-import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-
-import java.util.Optional;
-
-/**
- * Implementation to add or remove an observer to the managed channel.
- *
- */
-@Beta
-public interface GrpcObserverHandler {
- /**
- * The implementation of this method adds an
- * observer on a stub generated on the specific channel.
- * This method will be called by the gRPC controller.
- *
- * @param channel the channel from which to derive the stub.
- */
- void bindObserver(ManagedChannel channel);
-
- /**
- * The implementation of this method returns the request stream
- * observer, if any, on a stub generated on the specific channel.
- *
- * @return the observer on the stub, empty if observer is server-side unidirectional.
- */
- Optional<StreamObserver> requestStreamObserver();
-
- /**
- * The implementation of this method removes an
- * observer on a stub generated on the specific channel.
- * This method will be called by the gRPC controller.
- */
- void removeObserver();
-}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcServiceId.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcServiceId.java
deleted file mode 100644
index 4949ad8..0000000
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcServiceId.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.grpc.api;
-
-import com.google.common.annotations.Beta;
-import org.onlab.util.Identifier;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * gRPCService identifier suitable as an external key.
- * <p>
- * This class is immutable.</p>
- */
-@Beta
-public final class GrpcServiceId extends Identifier<String> {
-
- private final GrpcChannelId channelId;
-
- private final String serviceName;
-
- /**
- * Instantiates a new gRPC Service id.
- *
- * @param channelId the channel id
- * @param serviceName the name of the service on that channel
- */
- private GrpcServiceId(GrpcChannelId channelId, String serviceName) {
- super(channelId.toString() + ":" + serviceName);
- checkNotNull(channelId, "channel id must not be null");
- checkNotNull(serviceName, "service name must not be null");
- checkArgument(!serviceName.isEmpty(), "service name must not be empty");
- this.channelId = channelId;
- this.serviceName = serviceName;
- }
-
- /**
- * Returns the id of the channel that this service uses.
- *
- * @return the channel Id
- */
- public GrpcChannelId channelId() {
- return channelId;
- }
-
- /**
- * Returns the name of this service.
- *
- * @return the service name
- */
- public String serviceName() {
- return serviceName;
- }
-
- /**
- * Creates a gRPC Service identifier from the specified device id and
- * service name provided.
- *
- * @param id channel id
- * @param serviceName name of the service
- * @return service name
- */
- public static GrpcServiceId of(GrpcChannelId id, String serviceName) {
- return new GrpcServiceId(id, serviceName);
- }
-}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcStreamObserverId.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcStreamObserverId.java
deleted file mode 100644
index a827d02..0000000
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcStreamObserverId.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.grpc.api;
-
-import com.google.common.annotations.Beta;
-import org.onlab.util.Identifier;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * GrpcStreamObserver identifier suitable as an external key.
- * <p>
- * This class is immutable.</p>
- */
-@Beta
-public final class GrpcStreamObserverId extends Identifier<String> {
-
- private GrpcServiceId serviceId;
- private String streamName;
-
- /**
- * Instantiates a new GrpcStreamObserver id.
- *
- * @param serviceId the service id
- * @param streamName the name of the stream on that device
- */
- private GrpcStreamObserverId(GrpcServiceId serviceId, String streamName) {
- super(serviceId.toString() + ":" + streamName);
- checkNotNull(serviceId, "service id must not be null");
- checkNotNull(streamName, "stream name must not be null");
- checkArgument(!streamName.isEmpty(), "stream name must not be empty");
- this.serviceId = serviceId;
- this.streamName = streamName;
- }
-
- /**
- * Returns the id of the service that this stream observer uses.
- *
- * @return the service Id
- */
- public GrpcServiceId serviceId() {
- return serviceId;
- }
-
- /**
- * Returns the name of this stream.
- *
- * @return the stream name
- */
- public String streamName() {
- return streamName;
- }
-
- /**
- * Creates a gRPC Stream Observer identifier from the specified service id and
- * stream name provided.
- *
- * @param id service id
- * @param streamName stream name
- * @return stream name
- */
- public static GrpcStreamObserverId of(GrpcServiceId id, String streamName) {
- return new GrpcStreamObserverId(id, streamName);
- }
-}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
index 269936a..bbcd99f 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
@@ -37,8 +37,6 @@
import org.apache.felix.scr.annotations.Service;
import org.onosproject.grpc.api.GrpcChannelId;
import org.onosproject.grpc.api.GrpcController;
-import org.onosproject.grpc.api.GrpcObserverHandler;
-import org.onosproject.grpc.api.GrpcStreamObserverId;
import org.onosproject.grpc.ctl.dummy.Dummy;
import org.onosproject.grpc.ctl.dummy.DummyServiceGrpc;
import org.onosproject.net.DeviceId;
@@ -69,52 +67,27 @@
// TODO: make configurable at runtime
public static boolean enableMessageLog = false;
- private static final int CONNECTION_TIMEOUT_SECONDS = 20;
+ private final Logger log = LoggerFactory.getLogger(getClass());
- public static final Logger log = LoggerFactory
- .getLogger(GrpcControllerImpl.class);
-
- private Map<GrpcStreamObserverId, GrpcObserverHandler> observers;
private Map<GrpcChannelId, ManagedChannel> channels;
- private Map<GrpcChannelId, ManagedChannelBuilder<?>> channelBuilders;
private final Map<GrpcChannelId, Lock> channelLocks = Maps.newConcurrentMap();
@Activate
public void activate() {
- observers = new ConcurrentHashMap<>();
channels = new ConcurrentHashMap<>();
- channelBuilders = new ConcurrentHashMap<>();
log.info("Started");
}
@Deactivate
public void deactivate() {
channels.values().forEach(ManagedChannel::shutdown);
- observers.clear();
channels.clear();
- channelBuilders.clear();
log.info("Stopped");
}
@Override
- public void addObserver(GrpcStreamObserverId observerId, GrpcObserverHandler grpcObserverHandler) {
- grpcObserverHandler.bindObserver(channels.get(observerId.serviceId().channelId()));
- observers.put(observerId, grpcObserverHandler);
- }
-
- @Override
- public void removeObserver(GrpcStreamObserverId observerId) {
- observers.get(observerId).removeObserver();
- observers.remove(observerId);
- }
-
- @Override
- public Optional<GrpcObserverHandler> getObserverManager(GrpcStreamObserverId observerId) {
- return Optional.ofNullable(observers.get(observerId));
- }
-
- @Override
- public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder)
+ public ManagedChannel connectChannel(GrpcChannelId channelId,
+ ManagedChannelBuilder<?> channelBuilder)
throws IOException {
checkNotNull(channelId);
checkNotNull(channelBuilder);
@@ -130,7 +103,6 @@
// Forced connection not yet implemented. Use workaround...
// channel.getState(true);
doDummyMessage(channel);
- channelBuilders.put(channelId, channelBuilder);
channels.put(channelId, channel);
return channel;
} finally {
@@ -139,14 +111,16 @@
}
private void doDummyMessage(ManagedChannel channel) throws IOException {
- DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc.newBlockingStub(channel)
+ DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
+ .newBlockingStub(channel)
.withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
try {
- dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse.getDefaultInstance());
+ dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
+ .getDefaultInstance());
} catch (StatusRuntimeException e) {
if (e.getStatus() != Status.UNIMPLEMENTED) {
- // UNIMPLEMENTED means that server received our message but doesn't know how to handle it.
- // Hence, channel is open.
+ // UNIMPLEMENTED means that the server received our message but
+ // doesn't know how to handle it. Hence, channel is open.
throw new IOException(e);
}
}
@@ -161,15 +135,16 @@
try {
if (!channels.containsKey(channelId)) {
- log.warn("Can't check if channel open for unknown channel id {}", channelId);
+ log.warn("Can't check if channel open for unknown channel ID {}",
+ channelId);
return false;
}
try {
doDummyMessage(channels.get(channelId));
return true;
} catch (IOException e) {
- log.warn("Error in sending dummy message to device {}", channelId);
- log.debug("Exception ", e);
+ log.debug("Unable to send dummy message to {}: {}",
+ channelId, e.getCause().getMessage());
return false;
}
} finally {
@@ -199,7 +174,6 @@
}
channels.remove(channelId);
- channelBuilders.remove(channelId);
} finally {
lock.unlock();
}
@@ -249,14 +223,16 @@
}
@Override
- public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
- CallOptions callOptions, Channel channel) {
- return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(
- methodDescriptor, callOptions.withoutWaitForReady())) {
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+ MethodDescriptor<ReqT, RespT> methodDescriptor,
+ CallOptions callOptions, Channel channel) {
+ return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
+ channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) {
@Override
public void sendMessage(ReqT message) {
- log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}", channelId, methodDescriptor.getFullMethodName(),
+ log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}",
+ channelId, methodDescriptor.getFullMethodName(),
message.toString());
super.sendMessage(message);
}
@@ -272,8 +248,8 @@
@Override
public void onMessage(RespT message) {
- log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}", channelId,
- methodDescriptor.getFullMethodName(),
+ log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}",
+ channelId, methodDescriptor.getFullMethodName(),
message.toString());
super.onMessage(message);
}