blob: 6f233c8f8074c5af91c6359e71105984b5c1e56b [file] [log] [blame]
Andrea Campanella378e21a2017-06-07 12:09:59 +02001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella378e21a2017-06-07 12:09:59 +02003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
Carmelo Casconefb924072017-08-29 20:21:55 +020019import com.google.common.collect.ImmutableMap;
Andrea Campanella378e21a2017-06-07 12:09:59 +020020import com.google.common.collect.ImmutableSet;
Carmelo Casconefb924072017-08-29 20:21:55 +020021import com.google.common.collect.Maps;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040022import io.grpc.CallOptions;
23import io.grpc.Channel;
24import io.grpc.ClientCall;
25import io.grpc.ClientInterceptor;
26import io.grpc.ForwardingClientCall;
27import io.grpc.ForwardingClientCallListener;
Andrea Campanella378e21a2017-06-07 12:09:59 +020028import io.grpc.ManagedChannel;
29import io.grpc.ManagedChannelBuilder;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040030import io.grpc.Metadata;
31import io.grpc.MethodDescriptor;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040032import io.grpc.Status;
33import io.grpc.StatusRuntimeException;
Andrea Campanella378e21a2017-06-07 12:09:59 +020034import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
37import org.apache.felix.scr.annotations.Service;
38import org.onosproject.grpc.api.GrpcChannelId;
39import org.onosproject.grpc.api.GrpcController;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040040import org.onosproject.grpc.ctl.dummy.Dummy;
41import org.onosproject.grpc.ctl.dummy.DummyServiceGrpc;
Andrea Campanella378e21a2017-06-07 12:09:59 +020042import org.onosproject.net.DeviceId;
43import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
Carmelo Cascone59f57de2017-07-11 19:55:09 -040046import java.io.IOException;
Andrea Campanella378e21a2017-06-07 12:09:59 +020047import java.util.Collection;
48import java.util.HashSet;
49import java.util.Map;
50import java.util.Optional;
51import java.util.Set;
52import java.util.concurrent.ConcurrentHashMap;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040053import java.util.concurrent.TimeUnit;
Carmelo Casconefb924072017-08-29 20:21:55 +020054import java.util.concurrent.locks.Lock;
55import java.util.concurrent.locks.ReentrantLock;
56
57import static com.google.common.base.Preconditions.checkNotNull;
Andrea Campanella378e21a2017-06-07 12:09:59 +020058
59/**
60 * Default implementation of the GrpcController.
61 */
62@Component(immediate = true)
63@Service
64public class GrpcControllerImpl implements GrpcController {
65
Carmelo Cascone8d99b172017-07-18 17:26:31 -040066 // Hint: set to true to log all gRPC messages received/sent on all channels.
67 // TODO: make configurable at runtime
68 public static boolean enableMessageLog = false;
69
Carmelo Cascone47a853b2018-01-05 02:40:58 +010070 private final Logger log = LoggerFactory.getLogger(getClass());
Carmelo Cascone59f57de2017-07-11 19:55:09 -040071
Andrea Campanella378e21a2017-06-07 12:09:59 +020072 private Map<GrpcChannelId, ManagedChannel> channels;
Carmelo Casconefb924072017-08-29 20:21:55 +020073 private final Map<GrpcChannelId, Lock> channelLocks = Maps.newConcurrentMap();
Andrea Campanella378e21a2017-06-07 12:09:59 +020074
75 @Activate
76 public void activate() {
Andrea Campanella378e21a2017-06-07 12:09:59 +020077 channels = new ConcurrentHashMap<>();
Andrea Campanella378e21a2017-06-07 12:09:59 +020078 log.info("Started");
79 }
80
81 @Deactivate
82 public void deactivate() {
83 channels.values().forEach(ManagedChannel::shutdown);
Andrea Campanella378e21a2017-06-07 12:09:59 +020084 channels.clear();
Andrea Campanella378e21a2017-06-07 12:09:59 +020085 log.info("Stopped");
86 }
87
88 @Override
Carmelo Cascone47a853b2018-01-05 02:40:58 +010089 public ManagedChannel connectChannel(GrpcChannelId channelId,
90 ManagedChannelBuilder<?> channelBuilder)
Carmelo Cascone59f57de2017-07-11 19:55:09 -040091 throws IOException {
Carmelo Casconefb924072017-08-29 20:21:55 +020092 checkNotNull(channelId);
93 checkNotNull(channelBuilder);
Carmelo Cascone8d99b172017-07-18 17:26:31 -040094
Carmelo Casconefb924072017-08-29 20:21:55 +020095 Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
96 lock.lock();
97
98 try {
99 if (enableMessageLog) {
100 channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
101 }
102 ManagedChannel channel = channelBuilder.build();
103 // Forced connection not yet implemented. Use workaround...
104 // channel.getState(true);
105 doDummyMessage(channel);
Carmelo Casconefb924072017-08-29 20:21:55 +0200106 channels.put(channelId, channel);
107 return channel;
108 } finally {
109 lock.unlock();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400110 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200111 }
112
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400113 private void doDummyMessage(ManagedChannel channel) throws IOException {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100114 DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
115 .newBlockingStub(channel)
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400116 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
117 try {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100118 dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
119 .getDefaultInstance());
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400120 } catch (StatusRuntimeException e) {
121 if (e.getStatus() != Status.UNIMPLEMENTED) {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100122 // UNIMPLEMENTED means that the server received our message but
123 // doesn't know how to handle it. Hence, channel is open.
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400124 throw new IOException(e);
125 }
126 }
127 }
128
129 @Override
130 public boolean isChannelOpen(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200131 checkNotNull(channelId);
132
133 Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
134 lock.lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400135
136 try {
Carmelo Casconefb924072017-08-29 20:21:55 +0200137 if (!channels.containsKey(channelId)) {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100138 log.warn("Can't check if channel open for unknown channel ID {}",
139 channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200140 return false;
141 }
142 try {
143 doDummyMessage(channels.get(channelId));
144 return true;
145 } catch (IOException e) {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100146 log.debug("Unable to send dummy message to {}: {}",
147 channelId, e.getCause().getMessage());
Carmelo Casconefb924072017-08-29 20:21:55 +0200148 return false;
149 }
150 } finally {
151 lock.unlock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400152 }
153 }
154
Andrea Campanella378e21a2017-06-07 12:09:59 +0200155 @Override
156 public void disconnectChannel(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200157 checkNotNull(channelId);
158
159 Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
160 lock.lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400161
162 try {
Carmelo Casconefb924072017-08-29 20:21:55 +0200163 if (!channels.containsKey(channelId)) {
164 // Nothing to do.
165 return;
166 }
167 ManagedChannel channel = channels.get(channelId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400168
Carmelo Casconefb924072017-08-29 20:21:55 +0200169 try {
170 channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
171 } catch (InterruptedException e) {
172 log.warn("Channel {} didn't shut down in time.");
173 channel.shutdownNow();
Ray Milkey5c7d4882018-02-05 14:50:39 -0800174 Thread.currentThread().interrupt();
Carmelo Casconefb924072017-08-29 20:21:55 +0200175 }
176
177 channels.remove(channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200178 } finally {
179 lock.unlock();
180 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200181 }
182
183 @Override
184 public Map<GrpcChannelId, ManagedChannel> getChannels() {
Carmelo Casconefb924072017-08-29 20:21:55 +0200185 return ImmutableMap.copyOf(channels);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200186 }
187
188 @Override
189 public Collection<ManagedChannel> getChannels(final DeviceId deviceId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200190 checkNotNull(deviceId);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200191 final Set<ManagedChannel> deviceChannels = new HashSet<>();
192 channels.forEach((k, v) -> {
193 if (k.deviceId().equals(deviceId)) {
194 deviceChannels.add(v);
195 }
196 });
197
198 return ImmutableSet.copyOf(deviceChannels);
199 }
200
201 @Override
202 public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200203 checkNotNull(channelId);
204
205 Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
206 lock.lock();
207
208 try {
209 return Optional.ofNullable(channels.get(channelId));
210 } finally {
211 lock.unlock();
212 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200213 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400214
215 /**
216 * gRPC client interceptor that logs all messages sent and received.
217 */
218 private final class InternalLogChannelInterceptor implements ClientInterceptor {
219
220 private final GrpcChannelId channelId;
221
222 private InternalLogChannelInterceptor(GrpcChannelId channelId) {
223 this.channelId = channelId;
224 }
225
226 @Override
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100227 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
228 MethodDescriptor<ReqT, RespT> methodDescriptor,
229 CallOptions callOptions, Channel channel) {
230 return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
231 channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400232
233 @Override
234 public void sendMessage(ReqT message) {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100235 log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}",
236 channelId, methodDescriptor.getFullMethodName(),
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400237 message.toString());
238 super.sendMessage(message);
239 }
240
241 @Override
242 public void start(Listener<RespT> responseListener, Metadata headers) {
243
244 ClientCall.Listener<RespT> listener = new ForwardingClientCallListener<RespT>() {
245 @Override
246 protected Listener<RespT> delegate() {
247 return responseListener;
248 }
249
250 @Override
251 public void onMessage(RespT message) {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100252 log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}",
253 channelId, methodDescriptor.getFullMethodName(),
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400254 message.toString());
255 super.onMessage(message);
256 }
257 };
258 super.start(listener, headers);
259 }
260 };
261 }
262 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200263}