blob: b5b10db8b690f962d2eed3f524b0d0e857036799 [file] [log] [blame]
Andrea Campanella378e21a2017-06-07 12:09:59 +02001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella378e21a2017-06-07 12:09:59 +02003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
Carmelo Cascone158b8c42018-07-04 19:42:37 +020019import com.google.common.util.concurrent.Striped;
Carmelo Casconeb9536692019-05-28 18:15:23 -070020import io.grpc.LoadBalancerRegistry;
Andrea Campanella378e21a2017-06-07 12:09:59 +020021import io.grpc.ManagedChannel;
22import io.grpc.ManagedChannelBuilder;
Carmelo Casconeb9536692019-05-28 18:15:23 -070023import io.grpc.NameResolverRegistry;
24import io.grpc.internal.DnsNameResolverProvider;
25import io.grpc.internal.PickFirstLoadBalancerProvider;
Carmelo Casconec2be50a2019-04-10 00:15:39 -070026import io.grpc.netty.GrpcSslContexts;
27import io.grpc.netty.NettyChannelBuilder;
28import io.netty.handler.ssl.SslContext;
29import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020030import org.onlab.util.Tools;
31import org.onosproject.cfg.ComponentConfigService;
Yi Tseng2a340f72018-11-02 16:52:47 -070032import org.onosproject.grpc.api.GrpcChannelController;
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020033import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070034import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Modified;
38import org.osgi.service.component.annotations.Reference;
39import org.osgi.service.component.annotations.ReferenceCardinality;
Andrea Campanella378e21a2017-06-07 12:09:59 +020040import org.slf4j.Logger;
41import org.slf4j.LoggerFactory;
42
Carmelo Casconec2be50a2019-04-10 00:15:39 -070043import javax.net.ssl.SSLException;
44import java.net.URI;
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020045import java.util.Dictionary;
Andrea Campanella378e21a2017-06-07 12:09:59 +020046import java.util.Map;
47import java.util.Optional;
Andrea Campanella378e21a2017-06-07 12:09:59 +020048import java.util.concurrent.ConcurrentHashMap;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040049import java.util.concurrent.TimeUnit;
Carmelo Cascone73f45302019-02-04 23:11:26 -080050import java.util.concurrent.atomic.AtomicBoolean;
Carmelo Casconefb924072017-08-29 20:21:55 +020051import java.util.concurrent.locks.Lock;
Carmelo Casconefb924072017-08-29 20:21:55 +020052
Carmelo Casconec2be50a2019-04-10 00:15:39 -070053import static com.google.common.base.Preconditions.checkArgument;
Carmelo Casconefb924072017-08-29 20:21:55 +020054import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconec2be50a2019-04-10 00:15:39 -070055import static com.google.common.base.Strings.isNullOrEmpty;
Carmelo Cascone6d57f322018-12-13 23:15:17 -080056import static java.lang.String.format;
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070057import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG;
58import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_DEFAULT;
Andrea Campanella378e21a2017-06-07 12:09:59 +020059
60/**
Yi Tseng2a340f72018-11-02 16:52:47 -070061 * Default implementation of the GrpcChannelController.
Andrea Campanella378e21a2017-06-07 12:09:59 +020062 */
Ray Milkey5739b2c2018-11-06 14:04:51 -080063@Component(immediate = true, service = GrpcChannelController.class,
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070064 property = {
Carmelo Cascone73f45302019-02-04 23:11:26 -080065 ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT,
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070066 })
Yi Tseng2a340f72018-11-02 16:52:47 -070067public class GrpcChannelControllerImpl implements GrpcChannelController {
Andrea Campanella378e21a2017-06-07 12:09:59 +020068
Carmelo Casconec2be50a2019-04-10 00:15:39 -070069 private static final String GRPC = "grpc";
70 private static final String GRPCS = "grpcs";
71
72 private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
pierventre8e489de2021-06-29 14:04:42 +020073 // The maximum metadata size in Megabytes that a P4Runtime client should accept.
74 // This is necessary, because the P4Runtime protocol returns individual errors to
75 // requests in a batch all wrapped in a single status, which counts towards the
76 // metadata size limit. For large batches, this easily exceeds the default of
77 // 8KB. According to the tests done with Stratum, 4MB will support batches of
78 // around 40000 entries, assuming 100 bytes per error, without exceeding the
79 // maximum metadata size. Setting here 10 times higher.
80 private static final int DEFAULT_MAX_INBOUND_META_SIZE = 40;
Carmelo Casconec2be50a2019-04-10 00:15:39 -070081 private static final int MEGABYTES = 1024 * 1024;
82
Carmelo Casconeb9536692019-05-28 18:15:23 -070083 private static final PickFirstLoadBalancerProvider PICK_FIRST_LOAD_BALANCER_PROVIDER =
84 new PickFirstLoadBalancerProvider();
85 private static final DnsNameResolverProvider DNS_NAME_RESOLVER_PROVIDER =
86 new DnsNameResolverProvider();
87
Ray Milkeyd84f89b2018-08-17 14:54:17 -070088 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020089 protected ComponentConfigService componentConfigService;
90
Carmelo Cascone73f45302019-02-04 23:11:26 -080091 /**
92 * Indicates whether to log gRPC messages.
93 */
94 private final AtomicBoolean enableMessageLog = new AtomicBoolean(
95 ENABLE_MESSAGE_LOG_DEFAULT);
Carmelo Cascone8d99b172017-07-18 17:26:31 -040096
Carmelo Cascone47a853b2018-01-05 02:40:58 +010097 private final Logger log = LoggerFactory.getLogger(getClass());
Carmelo Cascone59f57de2017-07-11 19:55:09 -040098
Carmelo Casconec2be50a2019-04-10 00:15:39 -070099 private Map<URI, ManagedChannel> channels;
100 private Map<URI, GrpcLoggingInterceptor> interceptors;
Carmelo Cascone73f45302019-02-04 23:11:26 -0800101
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200102 private final Striped<Lock> channelLocks = Striped.lock(30);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200103
104 @Activate
105 public void activate() {
Andrea Campanellaa74bdba2018-05-15 16:45:00 +0200106 componentConfigService.registerProperties(getClass());
Andrea Campanella378e21a2017-06-07 12:09:59 +0200107 channels = new ConcurrentHashMap<>();
Carmelo Cascone73f45302019-02-04 23:11:26 -0800108 interceptors = new ConcurrentHashMap<>();
Carmelo Casconeb9536692019-05-28 18:15:23 -0700109 LoadBalancerRegistry.getDefaultRegistry()
110 .register(PICK_FIRST_LOAD_BALANCER_PROVIDER);
111 NameResolverRegistry.getDefaultRegistry()
112 .register(DNS_NAME_RESOLVER_PROVIDER);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200113 log.info("Started");
114 }
115
Andrea Campanellaa74bdba2018-05-15 16:45:00 +0200116 @Modified
117 public void modified(ComponentContext context) {
118 if (context != null) {
119 Dictionary<?, ?> properties = context.getProperties();
Carmelo Cascone73f45302019-02-04 23:11:26 -0800120 enableMessageLog.set(Tools.isPropertyEnabled(
121 properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT));
122 log.info("Configured. Logging of gRPC messages is {}",
Carmelo Cascone62d5c2e2019-03-07 18:53:17 -0800123 enableMessageLog.get() ? "ENABLED" : "DISABLED");
Andrea Campanellaa74bdba2018-05-15 16:45:00 +0200124 }
125 }
126
Andrea Campanella378e21a2017-06-07 12:09:59 +0200127 @Deactivate
128 public void deactivate() {
Carmelo Casconeb9536692019-05-28 18:15:23 -0700129 LoadBalancerRegistry.getDefaultRegistry()
130 .deregister(PICK_FIRST_LOAD_BALANCER_PROVIDER);
131 NameResolverRegistry.getDefaultRegistry()
132 .register(DNS_NAME_RESOLVER_PROVIDER);
Andrea Campanellaa74bdba2018-05-15 16:45:00 +0200133 componentConfigService.unregisterProperties(getClass(), false);
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800134 channels.values().forEach(ManagedChannel::shutdownNow);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200135 channels.clear();
Carmelo Cascone73f45302019-02-04 23:11:26 -0800136 channels = null;
137 interceptors.values().forEach(GrpcLoggingInterceptor::close);
138 interceptors.clear();
139 interceptors = null;
Andrea Campanella378e21a2017-06-07 12:09:59 +0200140 log.info("Stopped");
141 }
142
143 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700144 public ManagedChannel create(URI channelUri) {
145 return create(channelUri, makeChannelBuilder(channelUri));
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400146 }
147
148 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700149 public ManagedChannel create(URI channelUri, ManagedChannelBuilder<?> channelBuilder) {
150 checkNotNull(channelUri);
151 checkNotNull(channelBuilder);
Carmelo Casconefb924072017-08-29 20:21:55 +0200152
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700153 channelLocks.get(channelUri).lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400154 try {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700155 if (channels.containsKey(channelUri)) {
156 throw new IllegalArgumentException(format(
157 "A channel with ID '%s' already exists", channelUri));
Carmelo Casconefb924072017-08-29 20:21:55 +0200158 }
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700159
160 log.info("Creating new gRPC channel {}...", channelUri);
161
162 final GrpcLoggingInterceptor interceptor = new GrpcLoggingInterceptor(
163 channelUri, enableMessageLog);
164 channelBuilder.intercept(interceptor);
165
166 final ManagedChannel channel = channelBuilder.build();
167
168 channels.put(channelUri, channelBuilder.build());
169 interceptors.put(channelUri, interceptor);
170
171 return channel;
172 } finally {
173 channelLocks.get(channelUri).unlock();
174 }
175 }
176
177 private NettyChannelBuilder makeChannelBuilder(URI channelUri) {
178
179 checkArgument(channelUri.getScheme().equals(GRPC)
180 || channelUri.getScheme().equals(GRPCS),
181 format("Server URI scheme must be %s or %s", GRPC, GRPCS));
182 checkArgument(!isNullOrEmpty(channelUri.getHost()),
183 "Server host address should not be empty");
184 checkArgument(channelUri.getPort() > 0 && channelUri.getPort() <= 65535,
185 "Invalid server port");
186
187 final boolean useTls = channelUri.getScheme().equals(GRPCS);
188
189 final NettyChannelBuilder channelBuilder = NettyChannelBuilder
Carmelo Casconeb9536692019-05-28 18:15:23 -0700190 .forAddress(channelUri.getHost(), channelUri.getPort())
191 .nameResolverFactory(DNS_NAME_RESOLVER_PROVIDER)
192 .defaultLoadBalancingPolicy(
193 PICK_FIRST_LOAD_BALANCER_PROVIDER.getPolicyName())
194 .maxInboundMessageSize(
pierventre8e489de2021-06-29 14:04:42 +0200195 DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES)
196 .maxInboundMetadataSize(
197 DEFAULT_MAX_INBOUND_META_SIZE * MEGABYTES);
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700198
199 if (useTls) {
200 try {
201 // Accept any server certificate; this is insecure and
202 // should not be used in production.
203 final SslContext sslContext = GrpcSslContexts.forClient().trustManager(
204 InsecureTrustManagerFactory.INSTANCE).build();
205 channelBuilder.sslContext(sslContext).useTransportSecurity();
206 } catch (SSLException e) {
207 log.error("Failed to build SSL context", e);
208 return null;
209 }
210 } else {
211 channelBuilder.usePlaintext();
212 }
213
214 return channelBuilder;
215 }
216
217 @Override
218 public void destroy(URI channelUri) {
219 checkNotNull(channelUri);
220
221 channelLocks.get(channelUri).lock();
222 try {
pierventref92de512021-05-18 18:06:40 +0200223 log.info("Destroying gRPC channel {}...", channelUri);
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700224 final ManagedChannel channel = channels.remove(channelUri);
225 if (channel != null) {
226 shutdownNowAndWait(channel, channelUri);
227 }
228 final GrpcLoggingInterceptor interceptor = interceptors.remove(channelUri);
Carmelo Cascone73f45302019-02-04 23:11:26 -0800229 if (interceptor != null) {
230 interceptor.close();
231 }
Carmelo Casconefb924072017-08-29 20:21:55 +0200232 } finally {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700233 channelLocks.get(channelUri).unlock();
Carmelo Casconefb924072017-08-29 20:21:55 +0200234 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200235 }
236
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700237 private void shutdownNowAndWait(ManagedChannel channel, URI channelUri) {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800238 try {
239 if (!channel.shutdownNow()
240 .awaitTermination(5, TimeUnit.SECONDS)) {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700241 log.error("Channel {} did not terminate properly",
242 channelUri);
Carmelo Casconea71b8492018-12-17 17:47:50 -0800243 }
244 } catch (InterruptedException e) {
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700245 log.warn("Channel {} didn't shutdown in time", channelUri);
Carmelo Casconea71b8492018-12-17 17:47:50 -0800246 Thread.currentThread().interrupt();
247 }
248 }
249
Andrea Campanella378e21a2017-06-07 12:09:59 +0200250 @Override
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700251 public Optional<ManagedChannel> get(URI channelUri) {
252 checkNotNull(channelUri);
253 return Optional.ofNullable(channels.get(channelUri));
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800254 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200255}