blob: 4f0403af8f00e7d3ae55601ee9b63b9c677d4c44 [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
21import io.grpc.ManagedChannel;
Carmelo Casconea71b8492018-12-17 17:47:50 -080022import io.grpc.StatusRuntimeException;
Brian O'Connor1a378662018-12-12 17:27:11 -080023import io.grpc.netty.GrpcSslContexts;
Yi Tseng2a340f72018-11-02 16:52:47 -070024import io.grpc.netty.NettyChannelBuilder;
Brian O'Connor1a378662018-12-12 17:27:11 -080025import io.netty.handler.ssl.NotSslRecordException;
26import io.netty.handler.ssl.SslContext;
27import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Yi Tseng2a340f72018-11-02 16:52:47 -070028import org.onosproject.event.AbstractListenerManager;
29import org.onosproject.event.Event;
30import org.onosproject.event.EventListener;
31import org.onosproject.grpc.api.GrpcChannelController;
32import org.onosproject.grpc.api.GrpcChannelId;
33import org.onosproject.grpc.api.GrpcClient;
34import org.onosproject.grpc.api.GrpcClientController;
35import org.onosproject.grpc.api.GrpcClientKey;
36import org.onosproject.net.DeviceId;
Ray Milkey5739b2c2018-11-06 14:04:51 -080037import org.osgi.service.component.annotations.Activate;
Ray Milkey5739b2c2018-11-06 14:04:51 -080038import org.osgi.service.component.annotations.Deactivate;
39import org.osgi.service.component.annotations.Reference;
40import org.osgi.service.component.annotations.ReferenceCardinality;
Yi Tseng2a340f72018-11-02 16:52:47 -070041import org.slf4j.Logger;
42
Brian O'Connor1a378662018-12-12 17:27:11 -080043import javax.net.ssl.SSLException;
Yi Tseng2a340f72018-11-02 16:52:47 -070044import java.util.Map;
45import java.util.concurrent.locks.Lock;
46import java.util.function.Supplier;
47
48import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconea71b8492018-12-17 17:47:50 -080049import static java.lang.String.format;
Yi Tseng2a340f72018-11-02 16:52:47 -070050import static org.slf4j.LoggerFactory.getLogger;
51
52/**
53 * Abstract class of a gRPC based client controller for specific gRPC client
54 * which provides basic gRPC client management and thread safe mechanism.
55 *
56 * @param <C> the gRPC client type
57 * @param <K> the key type of the gRPC client
58 * @param <E> the event type of the gRPC client
59 * @param <L> the event listener of event {@link E}
60 */
Yi Tseng2a340f72018-11-02 16:52:47 -070061public abstract class AbstractGrpcClientController
62 <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
63 extends AbstractListenerManager<E, L>
64 implements GrpcClientController<K, C> {
65
66 /**
67 * The default max inbound message size (MB).
68 */
69 private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
70 private static final int MEGABYTES = 1024 * 1024;
71 private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
72
73 private final Logger log = getLogger(getClass());
74 private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
75 private final Map<K, C> clients = Maps.newHashMap();
76 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
77 private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
78
Ray Milkey5739b2c2018-11-06 14:04:51 -080079 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconea46f5542018-12-12 23:41:01 -080080 protected GrpcChannelController grpcChannelController;
Yi Tseng2a340f72018-11-02 16:52:47 -070081
82 @Activate
83 public void activate() {
84 log.info("Started");
85 }
86
87 @Deactivate
88 public void deactivate() {
89 clientKeys.keySet().forEach(this::removeClient);
90 clientKeys.clear();
91 clients.clear();
92 channelIds.clear();
93 log.info("Stopped");
94 }
95
96 @Override
97 public boolean createClient(K clientKey) {
98 checkNotNull(clientKey);
Brian O'Connor1a378662018-12-12 17:27:11 -080099 /*
100 FIXME we might want to move "useTls" and "fallback" to properties of the netcfg and clientKey
101 For now, we will first try to connect with TLS (accepting any cert), then fall back to
102 plaintext for every device
103 */
104 return withDeviceLock(() -> doCreateClient(clientKey, true, true), clientKey.deviceId());
Yi Tseng2a340f72018-11-02 16:52:47 -0700105 }
106
Brian O'Connor1a378662018-12-12 17:27:11 -0800107 private boolean doCreateClient(K clientKey, boolean useTls, boolean fallbackToPlainText) {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800108 final DeviceId deviceId = clientKey.deviceId();
109 final String serverAddr = clientKey.serverAddr();
110 final int serverPort = clientKey.serverPort();
Yi Tseng2a340f72018-11-02 16:52:47 -0700111
112 if (clientKeys.containsKey(deviceId)) {
113 final GrpcClientKey existingKey = clientKeys.get(deviceId);
114 if (clientKey.equals(existingKey)) {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800115 log.debug("Not creating {} as it already exists... (key={})",
116 clientName(clientKey), clientKey);
Yi Tseng2a340f72018-11-02 16:52:47 -0700117 return true;
118 } else {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800119 log.info("Requested new {} with updated key, removing old client... (oldKey={})",
120 clientName(clientKey), existingKey);
Yi Tseng2a340f72018-11-02 16:52:47 -0700121 doRemoveClient(deviceId);
122 }
123 }
Brian O'Connor1a378662018-12-12 17:27:11 -0800124
Carmelo Casconea71b8492018-12-17 17:47:50 -0800125 log.info("Creating new {}... (key={}, useTls={}, fallbackToPlainText={})",
126 clientName(clientKey), clientKey, useTls,
127 fallbackToPlainText);
Brian O'Connor1a378662018-12-12 17:27:11 -0800128
Carmelo Casconea71b8492018-12-17 17:47:50 -0800129 final GrpcChannelId channelId = GrpcChannelId.of(
130 clientKey.deviceId(), clientKey.toString());
131 final NettyChannelBuilder channelBuilder = NettyChannelBuilder
Yi Tseng2a340f72018-11-02 16:52:47 -0700132 .forAddress(serverAddr, serverPort)
Brian O'Connor1a378662018-12-12 17:27:11 -0800133 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
Carmelo Casconea71b8492018-12-17 17:47:50 -0800134
135 if (useTls) {
136 // FIXME: logic to create/manage SSL properties of a channel builder
137 // should belong to the GrpcChannelController.
138 log.debug("Using SSL for {}", clientName(clientKey), deviceId);
139 final SslContext sslContext;
140 try {
141 // Accept any server certificate; this is insecure and should
142 // not be used in production
143 sslContext = GrpcSslContexts.forClient()
144 .trustManager(InsecureTrustManagerFactory.INSTANCE)
145 .build();
146 } catch (SSLException e) {
147 log.error("Failed to build SSL context for {}", clientName(clientKey), e);
148 return false;
149 }
Brian O'Connor1a378662018-12-12 17:27:11 -0800150 channelBuilder
151 .sslContext(sslContext)
152 .useTransportSecurity();
153 } else {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800154 log.debug("Using plaintext TCP for {}", clientName(clientKey));
Brian O'Connor1a378662018-12-12 17:27:11 -0800155 channelBuilder.usePlaintext();
156 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700157
Carmelo Casconea71b8492018-12-17 17:47:50 -0800158 final ManagedChannel channel;
Yi Tseng2a340f72018-11-02 16:52:47 -0700159 try {
160 channel = grpcChannelController.connectChannel(channelId, channelBuilder);
Carmelo Casconea71b8492018-12-17 17:47:50 -0800161 } catch (Throwable e) {
Brian O'Connor1a378662018-12-12 17:27:11 -0800162 for (Throwable cause = e; cause != null; cause = cause.getCause()) {
163 if (useTls && cause instanceof NotSslRecordException) {
164 // Likely root cause is that server is using plaintext
Carmelo Casconea71b8492018-12-17 17:47:50 -0800165 log.warn("Failed to connect {} using TLS", clientName(clientKey));
Brian O'Connor1a378662018-12-12 17:27:11 -0800166 log.debug("TLS connection exception", e);
167 if (fallbackToPlainText) {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800168 log.info("Falling back to plaintext TCP for {}", clientName(clientKey));
Brian O'Connor1a378662018-12-12 17:27:11 -0800169 return doCreateClient(clientKey, false, false);
170 }
171 }
172 if (!useTls && "Connection reset by peer".equals(cause.getMessage())) {
173 // Not a great signal, but could indicate the server is expected a TLS connection
Carmelo Casconea71b8492018-12-17 17:47:50 -0800174 log.warn("Failed to connect {} using plaintext TCP; " +
175 "is the server using TLS?",
176 clientName(clientKey));
Brian O'Connor1a378662018-12-12 17:27:11 -0800177 break;
178 }
179 }
Carmelo Casconea71b8492018-12-17 17:47:50 -0800180 if (e instanceof StatusRuntimeException) {
181 log.warn("Unable to connect {}: {}", clientName(clientKey), e.getMessage());
182 log.debug("Connection exception", e);
183 } else {
184 log.error("Exception while connecting {}", clientName(clientKey), e);
185 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700186 return false;
187 }
188
Carmelo Casconea71b8492018-12-17 17:47:50 -0800189 final C client;
190 try {
191 client = createClientInstance(clientKey, channel);
192 } catch (Throwable e) {
193 log.error("Exception while creating {}", clientName(clientKey), e);
194 grpcChannelController.disconnectChannel(channelId);
Yi Tseng2a340f72018-11-02 16:52:47 -0700195 return false;
196 }
Carmelo Casconea71b8492018-12-17 17:47:50 -0800197
198 if (client == null) {
199 log.error("Unable to create {}, implementation returned null... (key={})",
200 clientName(clientKey), clientKey);
201 grpcChannelController.disconnectChannel(channelId);
202 return false;
203 }
204
Yi Tseng2a340f72018-11-02 16:52:47 -0700205 clientKeys.put(deviceId, clientKey);
206 clients.put(clientKey, client);
207 channelIds.put(deviceId, channelId);
208
209 return true;
210 }
211
212 protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
213
214 @Override
215 public C getClient(DeviceId deviceId) {
216 checkNotNull(deviceId);
217 return withDeviceLock(() -> doGetClient(deviceId), deviceId);
218 }
219
Yi Tsengd7716482018-10-31 15:34:30 -0700220 private C doGetClient(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700221 if (!clientKeys.containsKey(deviceId)) {
222 return null;
223 }
224 return clients.get(clientKeys.get(deviceId));
225 }
226
227 @Override
228 public void removeClient(DeviceId deviceId) {
229 checkNotNull(deviceId);
230 withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
231 }
232
233 private Void doRemoveClient(DeviceId deviceId) {
234 if (clientKeys.containsKey(deviceId)) {
235 final K clientKey = clientKeys.get(deviceId);
236 clients.get(clientKey).shutdown();
237 grpcChannelController.disconnectChannel(channelIds.get(deviceId));
238 clientKeys.remove(deviceId);
239 clients.remove(clientKey);
240 channelIds.remove(deviceId);
241 }
242 return null;
243 }
244
245 @Override
246 public boolean isReachable(DeviceId deviceId) {
247 checkNotNull(deviceId);
248 return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
249 }
250
Yi Tsengd7716482018-10-31 15:34:30 -0700251 private boolean doIsReachable(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700252 // Default behaviour checks only the gRPC channel, should
253 // check according to different gRPC service
254 if (!clientKeys.containsKey(deviceId)) {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800255 log.debug("Missing client for {}, cannot check for reachability", deviceId);
Yi Tseng2a340f72018-11-02 16:52:47 -0700256 return false;
257 }
258 return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
259 }
260
261 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
262 final Lock lock = stripedLocks.get(deviceId);
263 lock.lock();
264 try {
265 return task.get();
266 } finally {
267 lock.unlock();
268 }
269 }
Carmelo Casconea71b8492018-12-17 17:47:50 -0800270
271 private String clientName(GrpcClientKey key) {
272 return format("%s client for %s", key.serviceName(), key.deviceId());
273 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700274}