blob: 3c3820957c7ebc36cdd03a54ee06814433dc5ed5 [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
21import io.grpc.ManagedChannel;
Brian O'Connorc6943832018-12-12 17:27:11 -080022import io.grpc.netty.GrpcSslContexts;
Yi Tseng2a340f72018-11-02 16:52:47 -070023import io.grpc.netty.NettyChannelBuilder;
Brian O'Connorc6943832018-12-12 17:27:11 -080024import io.netty.handler.ssl.SslContext;
25import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Yi Tseng2a340f72018-11-02 16:52:47 -070026import org.onosproject.event.AbstractListenerManager;
27import org.onosproject.event.Event;
28import org.onosproject.event.EventListener;
29import org.onosproject.grpc.api.GrpcChannelController;
30import org.onosproject.grpc.api.GrpcChannelId;
31import org.onosproject.grpc.api.GrpcClient;
32import org.onosproject.grpc.api.GrpcClientController;
33import org.onosproject.grpc.api.GrpcClientKey;
34import org.onosproject.net.DeviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080035import org.onosproject.net.device.DeviceAgentEvent;
36import org.onosproject.net.device.DeviceAgentListener;
37import org.onosproject.net.provider.ProviderId;
Ray Milkey5739b2c2018-11-06 14:04:51 -080038import org.osgi.service.component.annotations.Activate;
Ray Milkey5739b2c2018-11-06 14:04:51 -080039import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Reference;
41import org.osgi.service.component.annotations.ReferenceCardinality;
Yi Tseng2a340f72018-11-02 16:52:47 -070042import org.slf4j.Logger;
43
Brian O'Connorc6943832018-12-12 17:27:11 -080044import javax.net.ssl.SSLException;
Yi Tseng2a340f72018-11-02 16:52:47 -070045import java.util.Map;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080046import java.util.concurrent.ConcurrentMap;
Yi Tseng2a340f72018-11-02 16:52:47 -070047import java.util.concurrent.locks.Lock;
48import java.util.function.Supplier;
49
50import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconea71b8492018-12-17 17:47:50 -080051import static java.lang.String.format;
Yi Tseng2a340f72018-11-02 16:52:47 -070052import static org.slf4j.LoggerFactory.getLogger;
53
54/**
55 * Abstract class of a gRPC based client controller for specific gRPC client
56 * which provides basic gRPC client management and thread safe mechanism.
57 *
58 * @param <C> the gRPC client type
59 * @param <K> the key type of the gRPC client
60 * @param <E> the event type of the gRPC client
61 * @param <L> the event listener of event {@link E}
62 */
Yi Tseng2a340f72018-11-02 16:52:47 -070063public abstract class AbstractGrpcClientController
64 <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
65 extends AbstractListenerManager<E, L>
66 implements GrpcClientController<K, C> {
67
68 /**
69 * The default max inbound message size (MB).
70 */
71 private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
72 private static final int MEGABYTES = 1024 * 1024;
73 private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
74
75 private final Logger log = getLogger(getClass());
76 private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
77 private final Map<K, C> clients = Maps.newHashMap();
78 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
Carmelo Cascone3977ea42019-02-28 13:43:42 -080079 private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
80 deviceAgentListeners = Maps.newConcurrentMap();
81 private final Class<E> eventClass;
Yi Tseng2a340f72018-11-02 16:52:47 -070082 private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
83
Ray Milkey5739b2c2018-11-06 14:04:51 -080084 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconea46f5542018-12-12 23:41:01 -080085 protected GrpcChannelController grpcChannelController;
Yi Tseng2a340f72018-11-02 16:52:47 -070086
Carmelo Cascone3977ea42019-02-28 13:43:42 -080087 public AbstractGrpcClientController(Class<E> eventClass) {
88 this.eventClass = eventClass;
89 }
90
Yi Tseng2a340f72018-11-02 16:52:47 -070091 @Activate
92 public void activate() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -080093 eventDispatcher.addSink(eventClass, listenerRegistry);
Yi Tseng2a340f72018-11-02 16:52:47 -070094 log.info("Started");
95 }
96
97 @Deactivate
98 public void deactivate() {
Carmelo Cascone3977ea42019-02-28 13:43:42 -080099 eventDispatcher.removeSink(eventClass);
Yi Tseng2a340f72018-11-02 16:52:47 -0700100 clientKeys.keySet().forEach(this::removeClient);
101 clientKeys.clear();
102 clients.clear();
103 channelIds.clear();
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800104 deviceAgentListeners.clear();
Yi Tseng2a340f72018-11-02 16:52:47 -0700105 log.info("Stopped");
106 }
107
108 @Override
109 public boolean createClient(K clientKey) {
110 checkNotNull(clientKey);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800111 return withDeviceLock(() -> doCreateClient(clientKey),
112 clientKey.deviceId());
Yi Tseng2a340f72018-11-02 16:52:47 -0700113 }
114
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800115 private boolean doCreateClient(K clientKey) {
Ray Milkeyfb503a72018-12-19 14:03:17 -0800116 DeviceId deviceId = clientKey.deviceId();
Yi Tseng2a340f72018-11-02 16:52:47 -0700117
118 if (clientKeys.containsKey(deviceId)) {
119 final GrpcClientKey existingKey = clientKeys.get(deviceId);
120 if (clientKey.equals(existingKey)) {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800121 log.debug("Not creating {} as it already exists... (key={})",
122 clientName(clientKey), clientKey);
Yi Tseng2a340f72018-11-02 16:52:47 -0700123 return true;
124 } else {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800125 throw new IllegalArgumentException(format(
126 "A client already exists for device %s (%s)",
127 deviceId, clientKey));
Yi Tseng2a340f72018-11-02 16:52:47 -0700128 }
129 }
Brian O'Connor1a378662018-12-12 17:27:11 -0800130
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800131 final String method = clientKey.requiresSecureChannel()
132 ? "TLS" : "plaintext TCP";
133
134 log.info("Connecting {} client for {} to server at {} using {}...",
135 clientKey.serviceName(), deviceId, clientKey.serveUri(), method);
Brian O'Connorc6943832018-12-12 17:27:11 -0800136
137 SslContext sslContext = null;
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800138 if (clientKey.requiresSecureChannel()) {
Brian O'Connorc6943832018-12-12 17:27:11 -0800139 try {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800140 // FIXME: Accept any server certificate; this is insecure and
141 // should not be used in production
142 sslContext = GrpcSslContexts.forClient().trustManager(
143 InsecureTrustManagerFactory.INSTANCE).build();
Brian O'Connorc6943832018-12-12 17:27:11 -0800144 } catch (SSLException e) {
145 log.error("Failed to build SSL Context", e);
146 return false;
147 }
148 }
149
Carmelo Cascone73f45302019-02-04 23:11:26 -0800150 GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString());
Brian O'Connorc6943832018-12-12 17:27:11 -0800151 NettyChannelBuilder channelBuilder = NettyChannelBuilder
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800152 .forAddress(clientKey.serveUri().getHost(),
153 clientKey.serveUri().getPort())
Brian O'Connorc6943832018-12-12 17:27:11 -0800154 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800155
Brian O'Connorc6943832018-12-12 17:27:11 -0800156 if (sslContext != null) {
Brian O'Connorc6943832018-12-12 17:27:11 -0800157 channelBuilder
158 .sslContext(sslContext)
159 .useTransportSecurity();
160 } else {
Brian O'Connorc6943832018-12-12 17:27:11 -0800161 channelBuilder.usePlaintext();
162 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700163
Carmelo Casconea71b8492018-12-17 17:47:50 -0800164 final ManagedChannel channel;
Ray Milkeyfb503a72018-12-19 14:03:17 -0800165
Brian O'Connorc6943832018-12-12 17:27:11 -0800166 try {
167 channel = grpcChannelController.connectChannel(channelId, channelBuilder);
168 } catch (Throwable e) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800169 log.warn("Failed to connect to {} ({}) using {}: {}",
170 deviceId, clientKey.serveUri(), method, e.toString());
171 log.debug("gRPC client connection exception", e);
Brian O'Connorc6943832018-12-12 17:27:11 -0800172 return false;
173 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700174
Carmelo Casconea71b8492018-12-17 17:47:50 -0800175 final C client;
176 try {
177 client = createClientInstance(clientKey, channel);
178 } catch (Throwable e) {
179 log.error("Exception while creating {}", clientName(clientKey), e);
180 grpcChannelController.disconnectChannel(channelId);
Yi Tseng2a340f72018-11-02 16:52:47 -0700181 return false;
182 }
Carmelo Casconea71b8492018-12-17 17:47:50 -0800183
184 if (client == null) {
185 log.error("Unable to create {}, implementation returned null... (key={})",
186 clientName(clientKey), clientKey);
187 grpcChannelController.disconnectChannel(channelId);
188 return false;
189 }
190
Yi Tseng2a340f72018-11-02 16:52:47 -0700191 clientKeys.put(deviceId, clientKey);
192 clients.put(clientKey, client);
193 channelIds.put(deviceId, channelId);
194
195 return true;
196 }
197
198 protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
199
200 @Override
201 public C getClient(DeviceId deviceId) {
202 checkNotNull(deviceId);
203 return withDeviceLock(() -> doGetClient(deviceId), deviceId);
204 }
205
Yi Tsengd7716482018-10-31 15:34:30 -0700206 private C doGetClient(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700207 if (!clientKeys.containsKey(deviceId)) {
208 return null;
209 }
210 return clients.get(clientKeys.get(deviceId));
211 }
212
213 @Override
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800214 public C getClient(K clientKey) {
215 checkNotNull(clientKey);
216 return clients.get(clientKey);
217 }
218
219 @Override
Yi Tseng2a340f72018-11-02 16:52:47 -0700220 public void removeClient(DeviceId deviceId) {
221 checkNotNull(deviceId);
222 withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
223 }
224
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800225 @Override
226 public void removeClient(K clientKey) {
227 checkNotNull(clientKey);
228 withDeviceLock(() -> doRemoveClient(clientKey), clientKey.deviceId());
229 }
230
Yi Tseng2a340f72018-11-02 16:52:47 -0700231 private Void doRemoveClient(DeviceId deviceId) {
232 if (clientKeys.containsKey(deviceId)) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800233 doRemoveClient(clientKeys.get(deviceId));
Yi Tseng2a340f72018-11-02 16:52:47 -0700234 }
235 return null;
236 }
237
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800238 private Void doRemoveClient(K clientKey) {
239 if (clients.containsKey(clientKey)) {
240 clients.get(clientKey).shutdown();
241 }
242 if (channelIds.containsKey(clientKey.deviceId())) {
243 grpcChannelController.disconnectChannel(
244 channelIds.get(clientKey.deviceId()));
245 }
246 clientKeys.remove(clientKey.deviceId());
247 clients.remove(clientKey);
248 channelIds.remove(clientKey.deviceId());
249 return null;
Yi Tseng2a340f72018-11-02 16:52:47 -0700250 }
251
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800252 @Override
253 public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
254 checkNotNull(deviceId, "deviceId cannot be null");
255 checkNotNull(deviceId, "providerId cannot be null");
256 checkNotNull(listener, "listener cannot be null");
257 deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
258 deviceAgentListeners.get(deviceId).put(providerId, listener);
259 }
260
261 @Override
262 public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
263 checkNotNull(deviceId, "deviceId cannot be null");
264 checkNotNull(providerId, "listener cannot be null");
265 deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
266 listeners.remove(providerId);
267 return listeners.isEmpty() ? null : listeners;
268 });
269 }
270
271 public void postEvent(E event) {
272 checkNotNull(event);
273 post(event);
274 }
275
276 public void postEvent(DeviceAgentEvent event) {
277 // We should have only one event delivery mechanism. We have two now
278 // because we have two different types of events, DeviceAgentEvent and
279 // controller/protocol specific ones (e.g. P4Runtime or gNMI).
280 // TODO: extend device agent event to allow delivery protocol-specific
281 // events, e.g. packet-in
282 checkNotNull(event);
283 if (deviceAgentListeners.containsKey(event.subject())) {
284 deviceAgentListeners.get(event.subject()).values()
285 .forEach(l -> l.event(event));
Yi Tseng2a340f72018-11-02 16:52:47 -0700286 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700287 }
288
289 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
290 final Lock lock = stripedLocks.get(deviceId);
291 lock.lock();
292 try {
293 return task.get();
294 } finally {
295 lock.unlock();
296 }
297 }
Carmelo Casconea71b8492018-12-17 17:47:50 -0800298
299 private String clientName(GrpcClientKey key) {
300 return format("%s client for %s", key.serviceName(), key.deviceId());
301 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700302}