blob: 91b9db0cc6be760d465863049134b147597887ce [file] [log] [blame]
Andrea Campanella378e21a2017-06-07 12:09:59 +02001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella378e21a2017-06-07 12:09:59 +02003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
Carmelo Casconefb924072017-08-29 20:21:55 +020019import com.google.common.collect.ImmutableMap;
Andrea Campanella378e21a2017-06-07 12:09:59 +020020import com.google.common.collect.ImmutableSet;
Carmelo Casconefb924072017-08-29 20:21:55 +020021import com.google.common.collect.Maps;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040022import io.grpc.CallOptions;
23import io.grpc.Channel;
24import io.grpc.ClientCall;
25import io.grpc.ClientInterceptor;
26import io.grpc.ForwardingClientCall;
27import io.grpc.ForwardingClientCallListener;
Andrea Campanella378e21a2017-06-07 12:09:59 +020028import io.grpc.ManagedChannel;
29import io.grpc.ManagedChannelBuilder;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040030import io.grpc.Metadata;
31import io.grpc.MethodDescriptor;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040032import io.grpc.Status;
33import io.grpc.StatusRuntimeException;
Andrea Campanella378e21a2017-06-07 12:09:59 +020034import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
37import org.apache.felix.scr.annotations.Service;
38import org.onosproject.grpc.api.GrpcChannelId;
39import org.onosproject.grpc.api.GrpcController;
40import org.onosproject.grpc.api.GrpcObserverHandler;
41import org.onosproject.grpc.api.GrpcStreamObserverId;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040042import org.onosproject.grpc.ctl.dummy.Dummy;
43import org.onosproject.grpc.ctl.dummy.DummyServiceGrpc;
Andrea Campanella378e21a2017-06-07 12:09:59 +020044import org.onosproject.net.DeviceId;
45import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
47
Carmelo Cascone59f57de2017-07-11 19:55:09 -040048import java.io.IOException;
Andrea Campanella378e21a2017-06-07 12:09:59 +020049import java.util.Collection;
50import java.util.HashSet;
51import java.util.Map;
52import java.util.Optional;
53import java.util.Set;
54import java.util.concurrent.ConcurrentHashMap;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040055import java.util.concurrent.TimeUnit;
Carmelo Casconefb924072017-08-29 20:21:55 +020056import java.util.concurrent.locks.Lock;
57import java.util.concurrent.locks.ReentrantLock;
58
59import static com.google.common.base.Preconditions.checkNotNull;
Andrea Campanella378e21a2017-06-07 12:09:59 +020060
61/**
62 * Default implementation of the GrpcController.
63 */
64@Component(immediate = true)
65@Service
66public class GrpcControllerImpl implements GrpcController {
67
Carmelo Cascone8d99b172017-07-18 17:26:31 -040068 // Hint: set to true to log all gRPC messages received/sent on all channels.
69 // TODO: make configurable at runtime
70 public static boolean enableMessageLog = false;
71
Carmelo Cascone59f57de2017-07-11 19:55:09 -040072 private static final int CONNECTION_TIMEOUT_SECONDS = 20;
73
Andrea Campanella378e21a2017-06-07 12:09:59 +020074 public static final Logger log = LoggerFactory
75 .getLogger(GrpcControllerImpl.class);
76
77 private Map<GrpcStreamObserverId, GrpcObserverHandler> observers;
78 private Map<GrpcChannelId, ManagedChannel> channels;
79 private Map<GrpcChannelId, ManagedChannelBuilder<?>> channelBuilders;
Carmelo Casconefb924072017-08-29 20:21:55 +020080 private final Map<GrpcChannelId, Lock> channelLocks = Maps.newConcurrentMap();
Andrea Campanella378e21a2017-06-07 12:09:59 +020081
82 @Activate
83 public void activate() {
84 observers = new ConcurrentHashMap<>();
85 channels = new ConcurrentHashMap<>();
86 channelBuilders = new ConcurrentHashMap<>();
87 log.info("Started");
88 }
89
90 @Deactivate
91 public void deactivate() {
92 channels.values().forEach(ManagedChannel::shutdown);
93 observers.clear();
94 channels.clear();
95 channelBuilders.clear();
96 log.info("Stopped");
97 }
98
99 @Override
100 public void addObserver(GrpcStreamObserverId observerId, GrpcObserverHandler grpcObserverHandler) {
101 grpcObserverHandler.bindObserver(channels.get(observerId.serviceId().channelId()));
102 observers.put(observerId, grpcObserverHandler);
103 }
104
105 @Override
106 public void removeObserver(GrpcStreamObserverId observerId) {
107 observers.get(observerId).removeObserver();
108 observers.remove(observerId);
109 }
110
111 @Override
112 public Optional<GrpcObserverHandler> getObserverManager(GrpcStreamObserverId observerId) {
113 return Optional.ofNullable(observers.get(observerId));
114 }
115
116 @Override
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400117 public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder)
118 throws IOException {
Carmelo Casconefb924072017-08-29 20:21:55 +0200119 checkNotNull(channelId);
120 checkNotNull(channelBuilder);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400121
Carmelo Casconefb924072017-08-29 20:21:55 +0200122 Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
123 lock.lock();
124
125 try {
126 if (enableMessageLog) {
127 channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
128 }
129 ManagedChannel channel = channelBuilder.build();
130 // Forced connection not yet implemented. Use workaround...
131 // channel.getState(true);
132 doDummyMessage(channel);
133 channelBuilders.put(channelId, channelBuilder);
134 channels.put(channelId, channel);
135 return channel;
136 } finally {
137 lock.unlock();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400138 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200139 }
140
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400141 private void doDummyMessage(ManagedChannel channel) throws IOException {
142 DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc.newBlockingStub(channel)
143 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
144 try {
145 dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse.getDefaultInstance());
146 } catch (StatusRuntimeException e) {
147 if (e.getStatus() != Status.UNIMPLEMENTED) {
148 // UNIMPLEMENTED means that server received our message but doesn't know how to handle it.
149 // Hence, channel is open.
150 throw new IOException(e);
151 }
152 }
153 }
154
155 @Override
156 public boolean isChannelOpen(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200157 checkNotNull(channelId);
158
159 Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
160 lock.lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400161
162 try {
Carmelo Casconefb924072017-08-29 20:21:55 +0200163 if (!channels.containsKey(channelId)) {
164 log.warn("Can't check if channel open for unknown channel id {}", channelId);
165 return false;
166 }
167 try {
168 doDummyMessage(channels.get(channelId));
169 return true;
170 } catch (IOException e) {
171 return false;
172 }
173 } finally {
174 lock.unlock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400175 }
176 }
177
Andrea Campanella378e21a2017-06-07 12:09:59 +0200178 @Override
179 public void disconnectChannel(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200180 checkNotNull(channelId);
181
182 Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
183 lock.lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400184
185 try {
Carmelo Casconefb924072017-08-29 20:21:55 +0200186 if (!channels.containsKey(channelId)) {
187 // Nothing to do.
188 return;
189 }
190 ManagedChannel channel = channels.get(channelId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400191
Carmelo Casconefb924072017-08-29 20:21:55 +0200192 try {
193 channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
194 } catch (InterruptedException e) {
195 log.warn("Channel {} didn't shut down in time.");
196 channel.shutdownNow();
197 }
198
199 channels.remove(channelId);
200 channelBuilders.remove(channelId);
201 } finally {
202 lock.unlock();
203 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200204 }
205
206 @Override
207 public Map<GrpcChannelId, ManagedChannel> getChannels() {
Carmelo Casconefb924072017-08-29 20:21:55 +0200208 return ImmutableMap.copyOf(channels);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200209 }
210
211 @Override
212 public Collection<ManagedChannel> getChannels(final DeviceId deviceId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200213 checkNotNull(deviceId);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200214 final Set<ManagedChannel> deviceChannels = new HashSet<>();
215 channels.forEach((k, v) -> {
216 if (k.deviceId().equals(deviceId)) {
217 deviceChannels.add(v);
218 }
219 });
220
221 return ImmutableSet.copyOf(deviceChannels);
222 }
223
224 @Override
225 public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200226 checkNotNull(channelId);
227
228 Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
229 lock.lock();
230
231 try {
232 return Optional.ofNullable(channels.get(channelId));
233 } finally {
234 lock.unlock();
235 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200236 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400237
238 /**
239 * gRPC client interceptor that logs all messages sent and received.
240 */
241 private final class InternalLogChannelInterceptor implements ClientInterceptor {
242
243 private final GrpcChannelId channelId;
244
245 private InternalLogChannelInterceptor(GrpcChannelId channelId) {
246 this.channelId = channelId;
247 }
248
249 @Override
250 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
251 CallOptions callOptions, Channel channel) {
252 return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(
253 methodDescriptor, callOptions.withoutWaitForReady())) {
254
255 @Override
256 public void sendMessage(ReqT message) {
257 log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}", channelId, methodDescriptor.getFullMethodName(),
258 message.toString());
259 super.sendMessage(message);
260 }
261
262 @Override
263 public void start(Listener<RespT> responseListener, Metadata headers) {
264
265 ClientCall.Listener<RespT> listener = new ForwardingClientCallListener<RespT>() {
266 @Override
267 protected Listener<RespT> delegate() {
268 return responseListener;
269 }
270
271 @Override
272 public void onMessage(RespT message) {
273 log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}", channelId,
274 methodDescriptor.getFullMethodName(),
275 message.toString());
276 super.onMessage(message);
277 }
278 };
279 super.start(listener, headers);
280 }
281 };
282 }
283 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200284}