ONOS-6680 Clean up implementation of gRPC controller
Change-Id: If84172d0a2dd64090557542af8ae12920260229f
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
index 269936a..bbcd99f 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
@@ -37,8 +37,6 @@
import org.apache.felix.scr.annotations.Service;
import org.onosproject.grpc.api.GrpcChannelId;
import org.onosproject.grpc.api.GrpcController;
-import org.onosproject.grpc.api.GrpcObserverHandler;
-import org.onosproject.grpc.api.GrpcStreamObserverId;
import org.onosproject.grpc.ctl.dummy.Dummy;
import org.onosproject.grpc.ctl.dummy.DummyServiceGrpc;
import org.onosproject.net.DeviceId;
@@ -69,52 +67,27 @@
// TODO: make configurable at runtime
public static boolean enableMessageLog = false;
- private static final int CONNECTION_TIMEOUT_SECONDS = 20;
+ private final Logger log = LoggerFactory.getLogger(getClass());
- public static final Logger log = LoggerFactory
- .getLogger(GrpcControllerImpl.class);
-
- private Map<GrpcStreamObserverId, GrpcObserverHandler> observers;
private Map<GrpcChannelId, ManagedChannel> channels;
- private Map<GrpcChannelId, ManagedChannelBuilder<?>> channelBuilders;
private final Map<GrpcChannelId, Lock> channelLocks = Maps.newConcurrentMap();
@Activate
public void activate() {
- observers = new ConcurrentHashMap<>();
channels = new ConcurrentHashMap<>();
- channelBuilders = new ConcurrentHashMap<>();
log.info("Started");
}
@Deactivate
public void deactivate() {
channels.values().forEach(ManagedChannel::shutdown);
- observers.clear();
channels.clear();
- channelBuilders.clear();
log.info("Stopped");
}
@Override
- public void addObserver(GrpcStreamObserverId observerId, GrpcObserverHandler grpcObserverHandler) {
- grpcObserverHandler.bindObserver(channels.get(observerId.serviceId().channelId()));
- observers.put(observerId, grpcObserverHandler);
- }
-
- @Override
- public void removeObserver(GrpcStreamObserverId observerId) {
- observers.get(observerId).removeObserver();
- observers.remove(observerId);
- }
-
- @Override
- public Optional<GrpcObserverHandler> getObserverManager(GrpcStreamObserverId observerId) {
- return Optional.ofNullable(observers.get(observerId));
- }
-
- @Override
- public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder)
+ public ManagedChannel connectChannel(GrpcChannelId channelId,
+ ManagedChannelBuilder<?> channelBuilder)
throws IOException {
checkNotNull(channelId);
checkNotNull(channelBuilder);
@@ -130,7 +103,6 @@
// Forced connection not yet implemented. Use workaround...
// channel.getState(true);
doDummyMessage(channel);
- channelBuilders.put(channelId, channelBuilder);
channels.put(channelId, channel);
return channel;
} finally {
@@ -139,14 +111,16 @@
}
private void doDummyMessage(ManagedChannel channel) throws IOException {
- DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc.newBlockingStub(channel)
+ DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
+ .newBlockingStub(channel)
.withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
try {
- dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse.getDefaultInstance());
+ dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
+ .getDefaultInstance());
} catch (StatusRuntimeException e) {
if (e.getStatus() != Status.UNIMPLEMENTED) {
- // UNIMPLEMENTED means that server received our message but doesn't know how to handle it.
- // Hence, channel is open.
+ // UNIMPLEMENTED means that the server received our message but
+ // doesn't know how to handle it. Hence, channel is open.
throw new IOException(e);
}
}
@@ -161,15 +135,16 @@
try {
if (!channels.containsKey(channelId)) {
- log.warn("Can't check if channel open for unknown channel id {}", channelId);
+ log.warn("Can't check if channel open for unknown channel ID {}",
+ channelId);
return false;
}
try {
doDummyMessage(channels.get(channelId));
return true;
} catch (IOException e) {
- log.warn("Error in sending dummy message to device {}", channelId);
- log.debug("Exception ", e);
+ log.debug("Unable to send dummy message to {}: {}",
+ channelId, e.getCause().getMessage());
return false;
}
} finally {
@@ -199,7 +174,6 @@
}
channels.remove(channelId);
- channelBuilders.remove(channelId);
} finally {
lock.unlock();
}
@@ -249,14 +223,16 @@
}
@Override
- public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
- CallOptions callOptions, Channel channel) {
- return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(
- methodDescriptor, callOptions.withoutWaitForReady())) {
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+ MethodDescriptor<ReqT, RespT> methodDescriptor,
+ CallOptions callOptions, Channel channel) {
+ return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
+ channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) {
@Override
public void sendMessage(ReqT message) {
- log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}", channelId, methodDescriptor.getFullMethodName(),
+ log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}",
+ channelId, methodDescriptor.getFullMethodName(),
message.toString());
super.sendMessage(message);
}
@@ -272,8 +248,8 @@
@Override
public void onMessage(RespT message) {
- log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}", channelId,
- methodDescriptor.getFullMethodName(),
+ log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}",
+ channelId, methodDescriptor.getFullMethodName(),
message.toString());
super.onMessage(message);
}