blob: bf2bbf1ae63c9cc84f585d05983487cb02f85eff [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
21import io.grpc.ManagedChannel;
22import io.grpc.ManagedChannelBuilder;
23import io.grpc.netty.NettyChannelBuilder;
Yi Tseng2a340f72018-11-02 16:52:47 -070024import org.onosproject.event.AbstractListenerManager;
25import org.onosproject.event.Event;
26import org.onosproject.event.EventListener;
27import org.onosproject.grpc.api.GrpcChannelController;
28import org.onosproject.grpc.api.GrpcChannelId;
29import org.onosproject.grpc.api.GrpcClient;
30import org.onosproject.grpc.api.GrpcClientController;
31import org.onosproject.grpc.api.GrpcClientKey;
32import org.onosproject.net.DeviceId;
Ray Milkey5739b2c2018-11-06 14:04:51 -080033import org.osgi.service.component.annotations.Activate;
34import org.osgi.service.component.annotations.Component;
35import org.osgi.service.component.annotations.Deactivate;
36import org.osgi.service.component.annotations.Reference;
37import org.osgi.service.component.annotations.ReferenceCardinality;
Yi Tseng2a340f72018-11-02 16:52:47 -070038import org.slf4j.Logger;
39
40import java.io.IOException;
41import java.util.Map;
42import java.util.concurrent.locks.Lock;
43import java.util.function.Supplier;
44
45import static com.google.common.base.Preconditions.checkNotNull;
46import static org.slf4j.LoggerFactory.getLogger;
47
48/**
49 * Abstract class of a gRPC based client controller for specific gRPC client
50 * which provides basic gRPC client management and thread safe mechanism.
51 *
52 * @param <C> the gRPC client type
53 * @param <K> the key type of the gRPC client
54 * @param <E> the event type of the gRPC client
55 * @param <L> the event listener of event {@link E}
56 */
57@Component
58public abstract class AbstractGrpcClientController
59 <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
60 extends AbstractListenerManager<E, L>
61 implements GrpcClientController<K, C> {
62
63 /**
64 * The default max inbound message size (MB).
65 */
66 private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
67 private static final int MEGABYTES = 1024 * 1024;
68 private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
69
70 private final Logger log = getLogger(getClass());
71 private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
72 private final Map<K, C> clients = Maps.newHashMap();
73 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
74 private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
75
Ray Milkey5739b2c2018-11-06 14:04:51 -080076 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yi Tseng2a340f72018-11-02 16:52:47 -070077 private GrpcChannelController grpcChannelController;
78
79 @Activate
80 public void activate() {
81 log.info("Started");
82 }
83
84 @Deactivate
85 public void deactivate() {
86 clientKeys.keySet().forEach(this::removeClient);
87 clientKeys.clear();
88 clients.clear();
89 channelIds.clear();
90 log.info("Stopped");
91 }
92
93 @Override
94 public boolean createClient(K clientKey) {
95 checkNotNull(clientKey);
96 return withDeviceLock(() -> doCreateClient(clientKey), clientKey.deviceId());
97 }
98
99 private boolean doCreateClient(K clientKey) {
100 DeviceId deviceId = clientKey.deviceId();
101 String serverAddr = clientKey.serverAddr();
102 int serverPort = clientKey.serverPort();
103
104 if (clientKeys.containsKey(deviceId)) {
105 final GrpcClientKey existingKey = clientKeys.get(deviceId);
106 if (clientKey.equals(existingKey)) {
107 log.debug("Not creating client for {} as it already exists (key={})...",
108 deviceId, clientKey);
109 return true;
110 } else {
111 log.info("Requested client for {} with new " +
112 "endpoint, removing old client (key={})...",
113 deviceId, clientKey);
114 doRemoveClient(deviceId);
115 }
116 }
117 log.info("Creating client for {} (server={}:{})...",
118 deviceId, serverAddr, serverPort);
119 GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(), clientKey.toString());
120 ManagedChannelBuilder channelBuilder = NettyChannelBuilder
121 .forAddress(serverAddr, serverPort)
122 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES)
123 .usePlaintext();
124
125 ManagedChannel channel;
126 try {
127 channel = grpcChannelController.connectChannel(channelId, channelBuilder);
128 } catch (IOException e) {
129 log.warn("Unable to connect to gRPC server of {}: {}",
130 clientKey.deviceId(), e.getMessage());
131 return false;
132 }
133
134 C client = createClientInstance(clientKey, channel);
135 if (client == null) {
136 log.warn("Cannot create client for {} (key={})", deviceId, clientKey);
137 return false;
138 }
139 clientKeys.put(deviceId, clientKey);
140 clients.put(clientKey, client);
141 channelIds.put(deviceId, channelId);
142
143 return true;
144 }
145
146 protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
147
148 @Override
149 public C getClient(DeviceId deviceId) {
150 checkNotNull(deviceId);
151 return withDeviceLock(() -> doGetClient(deviceId), deviceId);
152 }
153
Yi Tsengd7716482018-10-31 15:34:30 -0700154 private C doGetClient(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700155 if (!clientKeys.containsKey(deviceId)) {
156 return null;
157 }
158 return clients.get(clientKeys.get(deviceId));
159 }
160
161 @Override
162 public void removeClient(DeviceId deviceId) {
163 checkNotNull(deviceId);
164 withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
165 }
166
167 private Void doRemoveClient(DeviceId deviceId) {
168 if (clientKeys.containsKey(deviceId)) {
169 final K clientKey = clientKeys.get(deviceId);
170 clients.get(clientKey).shutdown();
171 grpcChannelController.disconnectChannel(channelIds.get(deviceId));
172 clientKeys.remove(deviceId);
173 clients.remove(clientKey);
174 channelIds.remove(deviceId);
175 }
176 return null;
177 }
178
179 @Override
180 public boolean isReachable(DeviceId deviceId) {
181 checkNotNull(deviceId);
182 return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
183 }
184
Yi Tsengd7716482018-10-31 15:34:30 -0700185 private boolean doIsReachable(DeviceId deviceId) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700186 // Default behaviour checks only the gRPC channel, should
187 // check according to different gRPC service
188 if (!clientKeys.containsKey(deviceId)) {
189 log.debug("No client for {}, can't check for reachability", deviceId);
190 return false;
191 }
192 return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
193 }
194
195 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
196 final Lock lock = stripedLocks.get(deviceId);
197 lock.lock();
198 try {
199 return task.get();
200 } finally {
201 lock.unlock();
202 }
203 }
204}