blob: db29284b0e8e903a82d09789dbf850b20a8ebb74 [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
21import io.grpc.ManagedChannel;
Brian O'Connor47a3aa62018-12-12 17:27:11 -080022import io.grpc.netty.GrpcSslContexts;
Yi Tseng2a340f72018-11-02 16:52:47 -070023import io.grpc.netty.NettyChannelBuilder;
Brian O'Connor47a3aa62018-12-12 17:27:11 -080024import io.netty.handler.ssl.NotSslRecordException;
25import io.netty.handler.ssl.SslContext;
26import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Yi Tseng2a340f72018-11-02 16:52:47 -070027import org.onosproject.event.AbstractListenerManager;
28import org.onosproject.event.Event;
29import org.onosproject.event.EventListener;
30import org.onosproject.grpc.api.GrpcChannelController;
31import org.onosproject.grpc.api.GrpcChannelId;
32import org.onosproject.grpc.api.GrpcClient;
33import org.onosproject.grpc.api.GrpcClientController;
34import org.onosproject.grpc.api.GrpcClientKey;
35import org.onosproject.net.DeviceId;
Ray Milkey5739b2c2018-11-06 14:04:51 -080036import org.osgi.service.component.annotations.Activate;
Ray Milkey5739b2c2018-11-06 14:04:51 -080037import org.osgi.service.component.annotations.Deactivate;
38import org.osgi.service.component.annotations.Reference;
39import org.osgi.service.component.annotations.ReferenceCardinality;
Yi Tseng2a340f72018-11-02 16:52:47 -070040import org.slf4j.Logger;
41
Brian O'Connor47a3aa62018-12-12 17:27:11 -080042import javax.net.ssl.SSLException;
Yi Tseng2a340f72018-11-02 16:52:47 -070043import java.io.IOException;
44import java.util.Map;
45import java.util.concurrent.locks.Lock;
46import java.util.function.Supplier;
47
48import static com.google.common.base.Preconditions.checkNotNull;
Brian O'Connor47a3aa62018-12-12 17:27:11 -080049import static com.google.common.base.Preconditions.checkState;
Yi Tseng2a340f72018-11-02 16:52:47 -070050import static org.slf4j.LoggerFactory.getLogger;
51
52/**
53 * Abstract class of a gRPC based client controller for specific gRPC client
54 * which provides basic gRPC client management and thread safe mechanism.
55 *
56 * @param <C> the gRPC client type
57 * @param <K> the key type of the gRPC client
58 * @param <E> the event type of the gRPC client
59 * @param <L> the event listener of event {@link E}
60 */
Yi Tseng2a340f72018-11-02 16:52:47 -070061public abstract class AbstractGrpcClientController
62 <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
63 extends AbstractListenerManager<E, L>
64 implements GrpcClientController<K, C> {
65
66 /**
67 * The default max inbound message size (MB).
68 */
69 private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
70 private static final int MEGABYTES = 1024 * 1024;
71 private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
72
73 private final Logger log = getLogger(getClass());
74 private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
75 private final Map<K, C> clients = Maps.newHashMap();
76 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
77 private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
78
Ray Milkey5739b2c2018-11-06 14:04:51 -080079 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconea46f5542018-12-12 23:41:01 -080080 protected GrpcChannelController grpcChannelController;
Yi Tseng2a340f72018-11-02 16:52:47 -070081
82 @Activate
83 public void activate() {
84 log.info("Started");
85 }
86
87 @Deactivate
88 public void deactivate() {
89 clientKeys.keySet().forEach(this::removeClient);
90 clientKeys.clear();
91 clients.clear();
92 channelIds.clear();
93 log.info("Stopped");
94 }
95
96 @Override
97 public boolean createClient(K clientKey) {
98 checkNotNull(clientKey);
Brian O'Connor47a3aa62018-12-12 17:27:11 -080099 /*
100 FIXME we might want to move "useTls" and "fallback" to properties of the netcfg and clientKey
101 For now, we will first try to connect with TLS (accepting any cert), then fall back to
102 plaintext for every device
103 */
104 return withDeviceLock(() -> doCreateClient(clientKey, true, true), clientKey.deviceId());
Yi Tseng2a340f72018-11-02 16:52:47 -0700105 }
106
Brian O'Connor47a3aa62018-12-12 17:27:11 -0800107 private boolean doCreateClient(K clientKey, boolean useTls, boolean fallbackToPlainText) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700108 DeviceId deviceId = clientKey.deviceId();
109 String serverAddr = clientKey.serverAddr();
110 int serverPort = clientKey.serverPort();
111
112 if (clientKeys.containsKey(deviceId)) {
113 final GrpcClientKey existingKey = clientKeys.get(deviceId);
114 if (clientKey.equals(existingKey)) {
115 log.debug("Not creating client for {} as it already exists (key={})...",
116 deviceId, clientKey);
117 return true;
118 } else {
119 log.info("Requested client for {} with new " +
120 "endpoint, removing old client (key={})...",
121 deviceId, clientKey);
122 doRemoveClient(deviceId);
123 }
124 }
Brian O'Connor47a3aa62018-12-12 17:27:11 -0800125 log.info("Creating client for {} (server={}:{})...", deviceId, serverAddr, serverPort);
126
127 SslContext sslContext = null;
128 if (useTls) {
129 try {
130 // Accept any server certificate; this is insecure and should not be used in production
131 sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
132 } catch (SSLException e) {
133 log.error("Failed to build SSL Context", e);
134 return false;
135 }
136 }
137
Yi Tseng2a340f72018-11-02 16:52:47 -0700138 GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(), clientKey.toString());
Brian O'Connor47a3aa62018-12-12 17:27:11 -0800139 NettyChannelBuilder channelBuilder = NettyChannelBuilder
Yi Tseng2a340f72018-11-02 16:52:47 -0700140 .forAddress(serverAddr, serverPort)
Brian O'Connor47a3aa62018-12-12 17:27:11 -0800141 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
142 if (sslContext != null) {
143 log.debug("Using SSL for gRPC connection to {}", deviceId);
144 channelBuilder
145 .sslContext(sslContext)
146 .useTransportSecurity();
147 } else {
148 checkState(!useTls,
149 "Not authorized to use plaintext for gRPC connection to {}", deviceId);
150 log.debug("Using plaintext TCP for gRPC connection to {}", deviceId);
151 channelBuilder.usePlaintext();
152 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700153
154 ManagedChannel channel;
155 try {
156 channel = grpcChannelController.connectChannel(channelId, channelBuilder);
157 } catch (IOException e) {
Brian O'Connor47a3aa62018-12-12 17:27:11 -0800158 for (Throwable cause = e; cause != null; cause = cause.getCause()) {
159 if (useTls && cause instanceof NotSslRecordException) {
160 // Likely root cause is that server is using plaintext
161 log.info("Failed to connect to server (device={}) using TLS", deviceId);
162 log.debug("TLS connection exception", e);
163 if (fallbackToPlainText) {
164 log.info("Falling back to plaintext for connection to {}", deviceId);
165 return doCreateClient(clientKey, false, false);
166 }
167 }
168 if (!useTls && "Connection reset by peer".equals(cause.getMessage())) {
169 // Not a great signal, but could indicate the server is expected a TLS connection
170 log.error("Failed to connect to server (device={}) using plaintext TCP; is the server using TLS?",
171 deviceId);
172 break;
173 }
174 }
175 log.warn("Unable to connect to gRPC server for {}", deviceId, e);
Yi Tseng2a340f72018-11-02 16:52:47 -0700176 return false;
177 }
178
179 C client = createClientInstance(clientKey, channel);
180 if (client == null) {
181 log.warn("Cannot create client for {} (key={})", deviceId, clientKey);
182 return false;
183 }
184 clientKeys.put(deviceId, clientKey);
185 clients.put(clientKey, client);
186 channelIds.put(deviceId, channelId);
187
188 return true;
189 }
190
191 protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
192
193 @Override
194 public C getClient(DeviceId deviceId) {
195 checkNotNull(deviceId);
196 return withDeviceLock(() -> doGetClient(deviceId), deviceId);
197 }
198
Yi Tsengd7716482018-10-31 15:34:30 -0700199 private C doGetClient(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700200 if (!clientKeys.containsKey(deviceId)) {
201 return null;
202 }
203 return clients.get(clientKeys.get(deviceId));
204 }
205
206 @Override
207 public void removeClient(DeviceId deviceId) {
208 checkNotNull(deviceId);
209 withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
210 }
211
212 private Void doRemoveClient(DeviceId deviceId) {
213 if (clientKeys.containsKey(deviceId)) {
214 final K clientKey = clientKeys.get(deviceId);
215 clients.get(clientKey).shutdown();
216 grpcChannelController.disconnectChannel(channelIds.get(deviceId));
217 clientKeys.remove(deviceId);
218 clients.remove(clientKey);
219 channelIds.remove(deviceId);
220 }
221 return null;
222 }
223
224 @Override
225 public boolean isReachable(DeviceId deviceId) {
226 checkNotNull(deviceId);
227 return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
228 }
229
Yi Tsengd7716482018-10-31 15:34:30 -0700230 private boolean doIsReachable(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700231 // Default behaviour checks only the gRPC channel, should
232 // check according to different gRPC service
233 if (!clientKeys.containsKey(deviceId)) {
234 log.debug("No client for {}, can't check for reachability", deviceId);
235 return false;
236 }
237 return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
238 }
239
240 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
241 final Lock lock = stripedLocks.get(deviceId);
242 lock.lock();
243 try {
244 return task.get();
245 } finally {
246 lock.unlock();
247 }
248 }
249}