blob: e5f4884a5dee654f974d583893aadfdc3a5b135a [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
21import io.grpc.ManagedChannel;
Ray Milkeyfb503a72018-12-19 14:03:17 -080022import io.grpc.ManagedChannelBuilder;
Yi Tseng2a340f72018-11-02 16:52:47 -070023import io.grpc.netty.NettyChannelBuilder;
Yi Tseng2a340f72018-11-02 16:52:47 -070024import org.onosproject.event.AbstractListenerManager;
25import org.onosproject.event.Event;
26import org.onosproject.event.EventListener;
27import org.onosproject.grpc.api.GrpcChannelController;
28import org.onosproject.grpc.api.GrpcChannelId;
29import org.onosproject.grpc.api.GrpcClient;
30import org.onosproject.grpc.api.GrpcClientController;
31import org.onosproject.grpc.api.GrpcClientKey;
32import org.onosproject.net.DeviceId;
Ray Milkey5739b2c2018-11-06 14:04:51 -080033import org.osgi.service.component.annotations.Activate;
Ray Milkey5739b2c2018-11-06 14:04:51 -080034import org.osgi.service.component.annotations.Deactivate;
35import org.osgi.service.component.annotations.Reference;
36import org.osgi.service.component.annotations.ReferenceCardinality;
Yi Tseng2a340f72018-11-02 16:52:47 -070037import org.slf4j.Logger;
38
Yi Tseng2a340f72018-11-02 16:52:47 -070039import java.util.Map;
40import java.util.concurrent.locks.Lock;
41import java.util.function.Supplier;
42
43import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconea71b8492018-12-17 17:47:50 -080044import static java.lang.String.format;
Yi Tseng2a340f72018-11-02 16:52:47 -070045import static org.slf4j.LoggerFactory.getLogger;
46
47/**
48 * Abstract class of a gRPC based client controller for specific gRPC client
49 * which provides basic gRPC client management and thread safe mechanism.
50 *
51 * @param <C> the gRPC client type
52 * @param <K> the key type of the gRPC client
53 * @param <E> the event type of the gRPC client
54 * @param <L> the event listener of event {@link E}
55 */
Yi Tseng2a340f72018-11-02 16:52:47 -070056public abstract class AbstractGrpcClientController
57 <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
58 extends AbstractListenerManager<E, L>
59 implements GrpcClientController<K, C> {
60
61 /**
62 * The default max inbound message size (MB).
63 */
64 private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
65 private static final int MEGABYTES = 1024 * 1024;
66 private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
67
68 private final Logger log = getLogger(getClass());
69 private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
70 private final Map<K, C> clients = Maps.newHashMap();
71 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
72 private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
73
Ray Milkey5739b2c2018-11-06 14:04:51 -080074 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconea46f5542018-12-12 23:41:01 -080075 protected GrpcChannelController grpcChannelController;
Yi Tseng2a340f72018-11-02 16:52:47 -070076
77 @Activate
78 public void activate() {
79 log.info("Started");
80 }
81
82 @Deactivate
83 public void deactivate() {
84 clientKeys.keySet().forEach(this::removeClient);
85 clientKeys.clear();
86 clients.clear();
87 channelIds.clear();
88 log.info("Stopped");
89 }
90
91 @Override
92 public boolean createClient(K clientKey) {
93 checkNotNull(clientKey);
Ray Milkeyfb503a72018-12-19 14:03:17 -080094 return withDeviceLock(() -> doCreateClient(clientKey), clientKey.deviceId());
Yi Tseng2a340f72018-11-02 16:52:47 -070095 }
96
Ray Milkeyfb503a72018-12-19 14:03:17 -080097
98 private boolean doCreateClient(K clientKey) {
99 DeviceId deviceId = clientKey.deviceId();
100 String serverAddr = clientKey.serverAddr();
101 int serverPort = clientKey.serverPort();
Yi Tseng2a340f72018-11-02 16:52:47 -0700102
103 if (clientKeys.containsKey(deviceId)) {
104 final GrpcClientKey existingKey = clientKeys.get(deviceId);
105 if (clientKey.equals(existingKey)) {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800106 log.debug("Not creating {} as it already exists... (key={})",
107 clientName(clientKey), clientKey);
Yi Tseng2a340f72018-11-02 16:52:47 -0700108 return true;
109 } else {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800110 log.info("Requested new {} with updated key, removing old client... (oldKey={})",
111 clientName(clientKey), existingKey);
Yi Tseng2a340f72018-11-02 16:52:47 -0700112 doRemoveClient(deviceId);
113 }
114 }
Brian O'Connor1a378662018-12-12 17:27:11 -0800115
Ray Milkeyfb503a72018-12-19 14:03:17 -0800116 log.info("Creating client for {} (server={}:{})...",
117 deviceId, serverAddr, serverPort);
118 GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(), clientKey.toString());
119 ManagedChannelBuilder channelBuilder = NettyChannelBuilder
Yi Tseng2a340f72018-11-02 16:52:47 -0700120 .forAddress(serverAddr, serverPort)
Ray Milkeyfb503a72018-12-19 14:03:17 -0800121 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES)
122 .usePlaintext();
Yi Tseng2a340f72018-11-02 16:52:47 -0700123
Carmelo Casconea71b8492018-12-17 17:47:50 -0800124 final ManagedChannel channel;
Ray Milkeyfb503a72018-12-19 14:03:17 -0800125
126 channel = grpcChannelController.connectChannel(channelId, channelBuilder);
127
Yi Tseng2a340f72018-11-02 16:52:47 -0700128
Carmelo Casconea71b8492018-12-17 17:47:50 -0800129 final C client;
130 try {
131 client = createClientInstance(clientKey, channel);
132 } catch (Throwable e) {
133 log.error("Exception while creating {}", clientName(clientKey), e);
134 grpcChannelController.disconnectChannel(channelId);
Yi Tseng2a340f72018-11-02 16:52:47 -0700135 return false;
136 }
Carmelo Casconea71b8492018-12-17 17:47:50 -0800137
138 if (client == null) {
139 log.error("Unable to create {}, implementation returned null... (key={})",
140 clientName(clientKey), clientKey);
141 grpcChannelController.disconnectChannel(channelId);
142 return false;
143 }
144
Yi Tseng2a340f72018-11-02 16:52:47 -0700145 clientKeys.put(deviceId, clientKey);
146 clients.put(clientKey, client);
147 channelIds.put(deviceId, channelId);
148
149 return true;
150 }
151
152 protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
153
154 @Override
155 public C getClient(DeviceId deviceId) {
156 checkNotNull(deviceId);
157 return withDeviceLock(() -> doGetClient(deviceId), deviceId);
158 }
159
Yi Tsengd7716482018-10-31 15:34:30 -0700160 private C doGetClient(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700161 if (!clientKeys.containsKey(deviceId)) {
162 return null;
163 }
164 return clients.get(clientKeys.get(deviceId));
165 }
166
167 @Override
168 public void removeClient(DeviceId deviceId) {
169 checkNotNull(deviceId);
170 withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
171 }
172
173 private Void doRemoveClient(DeviceId deviceId) {
174 if (clientKeys.containsKey(deviceId)) {
175 final K clientKey = clientKeys.get(deviceId);
176 clients.get(clientKey).shutdown();
177 grpcChannelController.disconnectChannel(channelIds.get(deviceId));
178 clientKeys.remove(deviceId);
179 clients.remove(clientKey);
180 channelIds.remove(deviceId);
181 }
182 return null;
183 }
184
185 @Override
186 public boolean isReachable(DeviceId deviceId) {
187 checkNotNull(deviceId);
188 return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
189 }
190
Yi Tsengd7716482018-10-31 15:34:30 -0700191 private boolean doIsReachable(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700192 // Default behaviour checks only the gRPC channel, should
193 // check according to different gRPC service
194 if (!clientKeys.containsKey(deviceId)) {
Carmelo Casconea71b8492018-12-17 17:47:50 -0800195 log.debug("Missing client for {}, cannot check for reachability", deviceId);
Yi Tseng2a340f72018-11-02 16:52:47 -0700196 return false;
197 }
198 return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
199 }
200
201 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
202 final Lock lock = stripedLocks.get(deviceId);
203 lock.lock();
204 try {
205 return task.get();
206 } finally {
207 lock.unlock();
208 }
209 }
Carmelo Casconea71b8492018-12-17 17:47:50 -0800210
211 private String clientName(GrpcClientKey key) {
212 return format("%s client for %s", key.serviceName(), key.deviceId());
213 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700214}