blob: c0a3c6ba9b6c2da6871e9853068b9b96fac4c752 [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
21import io.grpc.ManagedChannel;
Brian O'Connorc6943832018-12-12 17:27:11 -080022import io.grpc.netty.GrpcSslContexts;
Yi Tseng2a340f72018-11-02 16:52:47 -070023import io.grpc.netty.NettyChannelBuilder;
Brian O'Connorc6943832018-12-12 17:27:11 -080024import io.netty.handler.ssl.NotSslRecordException;
25import io.netty.handler.ssl.SslContext;
26import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Yi Tseng2a340f72018-11-02 16:52:47 -070027import org.onosproject.event.AbstractListenerManager;
28import org.onosproject.event.Event;
29import org.onosproject.event.EventListener;
30import org.onosproject.grpc.api.GrpcChannelController;
31import org.onosproject.grpc.api.GrpcChannelId;
32import org.onosproject.grpc.api.GrpcClient;
33import org.onosproject.grpc.api.GrpcClientController;
34import org.onosproject.grpc.api.GrpcClientKey;
35import org.onosproject.net.DeviceId;
Ray Milkey5739b2c2018-11-06 14:04:51 -080036import org.osgi.service.component.annotations.Activate;
Ray Milkey5739b2c2018-11-06 14:04:51 -080037import org.osgi.service.component.annotations.Deactivate;
38import org.osgi.service.component.annotations.Reference;
39import org.osgi.service.component.annotations.ReferenceCardinality;
Yi Tseng2a340f72018-11-02 16:52:47 -070040import org.slf4j.Logger;
41
Brian O'Connorc6943832018-12-12 17:27:11 -080042import javax.net.ssl.SSLException;
Yi Tseng2a340f72018-11-02 16:52:47 -070043import java.util.Map;
44import java.util.concurrent.locks.Lock;
45import java.util.function.Supplier;
46
47import static com.google.common.base.Preconditions.checkNotNull;
Brian O'Connorc6943832018-12-12 17:27:11 -080048import static com.google.common.base.Preconditions.checkState;
Carmelo Casconea71b8492018-12-17 17:47:50 -080049import static java.lang.String.format;
Yi Tseng2a340f72018-11-02 16:52:47 -070050import static org.slf4j.LoggerFactory.getLogger;
51
52/**
53 * Abstract class of a gRPC based client controller for specific gRPC client
54 * which provides basic gRPC client management and thread safe mechanism.
55 *
56 * @param <C> the gRPC client type
57 * @param <K> the key type of the gRPC client
58 * @param <E> the event type of the gRPC client
59 * @param <L> the event listener of event {@link E}
60 */
Yi Tseng2a340f72018-11-02 16:52:47 -070061public abstract class AbstractGrpcClientController
62 <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
63 extends AbstractListenerManager<E, L>
64 implements GrpcClientController<K, C> {
65
66 /**
67 * The default max inbound message size (MB).
68 */
69 private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
70 private static final int MEGABYTES = 1024 * 1024;
71 private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
72
73 private final Logger log = getLogger(getClass());
74 private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
75 private final Map<K, C> clients = Maps.newHashMap();
76 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
77 private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
78
Ray Milkey5739b2c2018-11-06 14:04:51 -080079 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconea46f5542018-12-12 23:41:01 -080080 protected GrpcChannelController grpcChannelController;
Yi Tseng2a340f72018-11-02 16:52:47 -070081
82 @Activate
83 public void activate() {
84 log.info("Started");
85 }
86
87 @Deactivate
88 public void deactivate() {
89 clientKeys.keySet().forEach(this::removeClient);
90 clientKeys.clear();
91 clients.clear();
92 channelIds.clear();
93 log.info("Stopped");
94 }
95
96 @Override
97 public boolean createClient(K clientKey) {
98 checkNotNull(clientKey);
Brian O'Connorc6943832018-12-12 17:27:11 -080099 /*
100 FIXME we might want to move "useTls" and "fallback" to properties of the netcfg and clientKey
101 For now, we will first try to connect with TLS (accepting any cert), then fall back to
102 plaintext for every device
103 */
104 return withDeviceLock(() -> doCreateClient(clientKey, true, true), clientKey.deviceId());
Yi Tseng2a340f72018-11-02 16:52:47 -0700105 }
106
Brian O'Connorc6943832018-12-12 17:27:11 -0800107 private boolean doCreateClient(K clientKey, boolean useTls, boolean fallbackToPlainText) {
Ray Milkeyfb503a72018-12-19 14:03:17 -0800108 DeviceId deviceId = clientKey.deviceId();
109 String serverAddr = clientKey.serverAddr();
110 int serverPort = clientKey.serverPort();
Yi Tseng2a340f72018-11-02 16:52:47 -0700111
112 if (clientKeys.containsKey(deviceId)) {
113 final GrpcClientKey existingKey = clientKeys.get(deviceId);
114 if (clientKey.equals(existingKey)) {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800115 log.debug("Not creating {} as it already exists... (key={})",
116 clientName(clientKey), clientKey);
Yi Tseng2a340f72018-11-02 16:52:47 -0700117 return true;
118 } else {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800119 log.info("Requested new {} with updated key, removing old client... (oldKey={})",
120 clientName(clientKey), existingKey);
Yi Tseng2a340f72018-11-02 16:52:47 -0700121 doRemoveClient(deviceId);
122 }
123 }
Brian O'Connor1a378662018-12-12 17:27:11 -0800124
Brian O'Connorc6943832018-12-12 17:27:11 -0800125 log.info("Creating client for {} (server={}:{})...", deviceId, serverAddr, serverPort);
126
127 SslContext sslContext = null;
128 if (useTls) {
129 try {
130 // Accept any server certificate; this is insecure and should not be used in production
131 sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
132 } catch (SSLException e) {
133 log.error("Failed to build SSL Context", e);
134 return false;
135 }
136 }
137
Carmelo Cascone73f45302019-02-04 23:11:26 -0800138 GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString());
Brian O'Connorc6943832018-12-12 17:27:11 -0800139 NettyChannelBuilder channelBuilder = NettyChannelBuilder
Yi Tseng2a340f72018-11-02 16:52:47 -0700140 .forAddress(serverAddr, serverPort)
Brian O'Connorc6943832018-12-12 17:27:11 -0800141 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
142 if (sslContext != null) {
143 log.debug("Using SSL for gRPC connection to {}", deviceId);
144 channelBuilder
145 .sslContext(sslContext)
146 .useTransportSecurity();
147 } else {
148 checkState(!useTls,
149 "Not authorized to use plaintext for gRPC connection to {}", deviceId);
150 log.debug("Using plaintext TCP for gRPC connection to {}", deviceId);
151 channelBuilder.usePlaintext();
152 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700153
Carmelo Casconea71b8492018-12-17 17:47:50 -0800154 final ManagedChannel channel;
Ray Milkeyfb503a72018-12-19 14:03:17 -0800155
Brian O'Connorc6943832018-12-12 17:27:11 -0800156 try {
157 channel = grpcChannelController.connectChannel(channelId, channelBuilder);
158 } catch (Throwable e) {
159 for (Throwable cause = e; cause != null; cause = cause.getCause()) {
160 if (useTls && cause instanceof NotSslRecordException) {
161 // Likely root cause is that server is using plaintext
162 log.info("Failed to connect to server (device={}) using TLS", deviceId);
163 log.debug("TLS connection exception", e);
164 if (fallbackToPlainText) {
165 log.info("Falling back to plaintext for connection to {}", deviceId);
166 return doCreateClient(clientKey, false, false);
167 }
168 }
169 if (!useTls && "Connection reset by peer".equals(cause.getMessage())) {
170 // Not a great signal, but could indicate the server is expected a TLS connection
171 log.error("Failed to connect to server (device={}) using plaintext TCP; is the server using TLS?",
172 deviceId);
173 break;
174 }
175 }
176 log.warn("Unable to connect to gRPC server for {}", deviceId, e);
177 return false;
178 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700179
Carmelo Casconea71b8492018-12-17 17:47:50 -0800180 final C client;
181 try {
182 client = createClientInstance(clientKey, channel);
183 } catch (Throwable e) {
184 log.error("Exception while creating {}", clientName(clientKey), e);
185 grpcChannelController.disconnectChannel(channelId);
Yi Tseng2a340f72018-11-02 16:52:47 -0700186 return false;
187 }
Carmelo Casconea71b8492018-12-17 17:47:50 -0800188
189 if (client == null) {
190 log.error("Unable to create {}, implementation returned null... (key={})",
191 clientName(clientKey), clientKey);
192 grpcChannelController.disconnectChannel(channelId);
193 return false;
194 }
195
Yi Tseng2a340f72018-11-02 16:52:47 -0700196 clientKeys.put(deviceId, clientKey);
197 clients.put(clientKey, client);
198 channelIds.put(deviceId, channelId);
199
200 return true;
201 }
202
203 protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
204
205 @Override
206 public C getClient(DeviceId deviceId) {
207 checkNotNull(deviceId);
208 return withDeviceLock(() -> doGetClient(deviceId), deviceId);
209 }
210
Yi Tsengd7716482018-10-31 15:34:30 -0700211 private C doGetClient(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700212 if (!clientKeys.containsKey(deviceId)) {
213 return null;
214 }
215 return clients.get(clientKeys.get(deviceId));
216 }
217
218 @Override
219 public void removeClient(DeviceId deviceId) {
220 checkNotNull(deviceId);
221 withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
222 }
223
224 private Void doRemoveClient(DeviceId deviceId) {
225 if (clientKeys.containsKey(deviceId)) {
226 final K clientKey = clientKeys.get(deviceId);
227 clients.get(clientKey).shutdown();
228 grpcChannelController.disconnectChannel(channelIds.get(deviceId));
229 clientKeys.remove(deviceId);
230 clients.remove(clientKey);
231 channelIds.remove(deviceId);
232 }
233 return null;
234 }
235
236 @Override
237 public boolean isReachable(DeviceId deviceId) {
238 checkNotNull(deviceId);
239 return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
240 }
241
Yi Tsengd7716482018-10-31 15:34:30 -0700242 private boolean doIsReachable(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700243 // Default behaviour checks only the gRPC channel, should
244 // check according to different gRPC service
245 if (!clientKeys.containsKey(deviceId)) {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800246 log.debug("Missing client for {}, cannot check for reachability", deviceId);
Yi Tseng2a340f72018-11-02 16:52:47 -0700247 return false;
248 }
249 return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
250 }
251
252 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
253 final Lock lock = stripedLocks.get(deviceId);
254 lock.lock();
255 try {
256 return task.get();
257 } finally {
258 lock.unlock();
259 }
260 }
Carmelo Casconea71b8492018-12-17 17:47:50 -0800261
262 private String clientName(GrpcClientKey key) {
263 return format("%s client for %s", key.serviceName(), key.deviceId());
264 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700265}