blob: 375ff329ca6ff63e0cfcc815f9ce773469154001 [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
21import io.grpc.ManagedChannel;
Ray Milkeyfa066ed2018-12-14 21:53:12 +000022import io.grpc.ManagedChannelBuilder;
Yi Tseng2a340f72018-11-02 16:52:47 -070023import io.grpc.netty.NettyChannelBuilder;
Yi Tseng2a340f72018-11-02 16:52:47 -070024import org.onosproject.event.AbstractListenerManager;
25import org.onosproject.event.Event;
26import org.onosproject.event.EventListener;
27import org.onosproject.grpc.api.GrpcChannelController;
28import org.onosproject.grpc.api.GrpcChannelId;
29import org.onosproject.grpc.api.GrpcClient;
30import org.onosproject.grpc.api.GrpcClientController;
31import org.onosproject.grpc.api.GrpcClientKey;
32import org.onosproject.net.DeviceId;
Ray Milkey5739b2c2018-11-06 14:04:51 -080033import org.osgi.service.component.annotations.Activate;
Ray Milkey5739b2c2018-11-06 14:04:51 -080034import org.osgi.service.component.annotations.Deactivate;
35import org.osgi.service.component.annotations.Reference;
36import org.osgi.service.component.annotations.ReferenceCardinality;
Yi Tseng2a340f72018-11-02 16:52:47 -070037import org.slf4j.Logger;
38
39import java.io.IOException;
40import java.util.Map;
41import java.util.concurrent.locks.Lock;
42import java.util.function.Supplier;
43
44import static com.google.common.base.Preconditions.checkNotNull;
45import static org.slf4j.LoggerFactory.getLogger;
46
47/**
48 * Abstract class of a gRPC based client controller for specific gRPC client
49 * which provides basic gRPC client management and thread safe mechanism.
50 *
51 * @param <C> the gRPC client type
52 * @param <K> the key type of the gRPC client
53 * @param <E> the event type of the gRPC client
54 * @param <L> the event listener of event {@link E}
55 */
Yi Tseng2a340f72018-11-02 16:52:47 -070056public abstract class AbstractGrpcClientController
57 <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
58 extends AbstractListenerManager<E, L>
59 implements GrpcClientController<K, C> {
60
61 /**
62 * The default max inbound message size (MB).
63 */
64 private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
65 private static final int MEGABYTES = 1024 * 1024;
66 private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
67
68 private final Logger log = getLogger(getClass());
69 private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
70 private final Map<K, C> clients = Maps.newHashMap();
71 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
72 private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
73
Ray Milkey5739b2c2018-11-06 14:04:51 -080074 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconea46f5542018-12-12 23:41:01 -080075 protected GrpcChannelController grpcChannelController;
Yi Tseng2a340f72018-11-02 16:52:47 -070076
77 @Activate
78 public void activate() {
79 log.info("Started");
80 }
81
82 @Deactivate
83 public void deactivate() {
84 clientKeys.keySet().forEach(this::removeClient);
85 clientKeys.clear();
86 clients.clear();
87 channelIds.clear();
88 log.info("Stopped");
89 }
90
91 @Override
92 public boolean createClient(K clientKey) {
93 checkNotNull(clientKey);
Ray Milkeyfa066ed2018-12-14 21:53:12 +000094 return withDeviceLock(() -> doCreateClient(clientKey), clientKey.deviceId());
Yi Tseng2a340f72018-11-02 16:52:47 -070095 }
96
Ray Milkeyfa066ed2018-12-14 21:53:12 +000097 private boolean doCreateClient(K clientKey) {
Yi Tseng2a340f72018-11-02 16:52:47 -070098 DeviceId deviceId = clientKey.deviceId();
99 String serverAddr = clientKey.serverAddr();
100 int serverPort = clientKey.serverPort();
101
102 if (clientKeys.containsKey(deviceId)) {
103 final GrpcClientKey existingKey = clientKeys.get(deviceId);
104 if (clientKey.equals(existingKey)) {
105 log.debug("Not creating client for {} as it already exists (key={})...",
106 deviceId, clientKey);
107 return true;
108 } else {
109 log.info("Requested client for {} with new " +
110 "endpoint, removing old client (key={})...",
111 deviceId, clientKey);
112 doRemoveClient(deviceId);
113 }
114 }
Ray Milkeyfa066ed2018-12-14 21:53:12 +0000115 log.info("Creating client for {} (server={}:{})...",
116 deviceId, serverAddr, serverPort);
Yi Tseng2a340f72018-11-02 16:52:47 -0700117 GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(), clientKey.toString());
Ray Milkeyfa066ed2018-12-14 21:53:12 +0000118 ManagedChannelBuilder channelBuilder = NettyChannelBuilder
Yi Tseng2a340f72018-11-02 16:52:47 -0700119 .forAddress(serverAddr, serverPort)
Ray Milkeyfa066ed2018-12-14 21:53:12 +0000120 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES)
121 .usePlaintext();
Yi Tseng2a340f72018-11-02 16:52:47 -0700122
123 ManagedChannel channel;
124 try {
125 channel = grpcChannelController.connectChannel(channelId, channelBuilder);
126 } catch (IOException e) {
Ray Milkeyfa066ed2018-12-14 21:53:12 +0000127 log.warn("Unable to connect to gRPC server of {}: {}",
128 clientKey.deviceId(), e.getMessage());
Yi Tseng2a340f72018-11-02 16:52:47 -0700129 return false;
130 }
131
132 C client = createClientInstance(clientKey, channel);
133 if (client == null) {
134 log.warn("Cannot create client for {} (key={})", deviceId, clientKey);
135 return false;
136 }
137 clientKeys.put(deviceId, clientKey);
138 clients.put(clientKey, client);
139 channelIds.put(deviceId, channelId);
140
141 return true;
142 }
143
144 protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
145
146 @Override
147 public C getClient(DeviceId deviceId) {
148 checkNotNull(deviceId);
149 return withDeviceLock(() -> doGetClient(deviceId), deviceId);
150 }
151
Yi Tsengd7716482018-10-31 15:34:30 -0700152 private C doGetClient(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700153 if (!clientKeys.containsKey(deviceId)) {
154 return null;
155 }
156 return clients.get(clientKeys.get(deviceId));
157 }
158
159 @Override
160 public void removeClient(DeviceId deviceId) {
161 checkNotNull(deviceId);
162 withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
163 }
164
165 private Void doRemoveClient(DeviceId deviceId) {
166 if (clientKeys.containsKey(deviceId)) {
167 final K clientKey = clientKeys.get(deviceId);
168 clients.get(clientKey).shutdown();
169 grpcChannelController.disconnectChannel(channelIds.get(deviceId));
170 clientKeys.remove(deviceId);
171 clients.remove(clientKey);
172 channelIds.remove(deviceId);
173 }
174 return null;
175 }
176
177 @Override
178 public boolean isReachable(DeviceId deviceId) {
179 checkNotNull(deviceId);
180 return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
181 }
182
Yi Tsengd7716482018-10-31 15:34:30 -0700183 private boolean doIsReachable(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700184 // Default behaviour checks only the gRPC channel, should
185 // check according to different gRPC service
186 if (!clientKeys.containsKey(deviceId)) {
187 log.debug("No client for {}, can't check for reachability", deviceId);
188 return false;
189 }
190 return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
191 }
192
193 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
194 final Lock lock = stripedLocks.get(deviceId);
195 lock.lock();
196 try {
197 return task.get();
198 } finally {
199 lock.unlock();
200 }
201 }
202}