blob: 9ae14dadf710cab44e0dc435a3a7ad7e77f370a9 [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
21import io.grpc.ManagedChannel;
22import io.grpc.ManagedChannelBuilder;
23import io.grpc.netty.NettyChannelBuilder;
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29
30import org.onosproject.event.AbstractListenerManager;
31import org.onosproject.event.Event;
32import org.onosproject.event.EventListener;
33import org.onosproject.grpc.api.GrpcChannelController;
34import org.onosproject.grpc.api.GrpcChannelId;
35import org.onosproject.grpc.api.GrpcClient;
36import org.onosproject.grpc.api.GrpcClientController;
37import org.onosproject.grpc.api.GrpcClientKey;
38import org.onosproject.net.DeviceId;
39import org.slf4j.Logger;
40
41import java.io.IOException;
42import java.util.Map;
43import java.util.concurrent.locks.Lock;
44import java.util.function.Supplier;
45
46import static com.google.common.base.Preconditions.checkNotNull;
47import static org.slf4j.LoggerFactory.getLogger;
48
49/**
50 * Abstract class of a gRPC based client controller for specific gRPC client
51 * which provides basic gRPC client management and thread safe mechanism.
52 *
53 * @param <C> the gRPC client type
54 * @param <K> the key type of the gRPC client
55 * @param <E> the event type of the gRPC client
56 * @param <L> the event listener of event {@link E}
57 */
58@Component
59public abstract class AbstractGrpcClientController
60 <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
61 extends AbstractListenerManager<E, L>
62 implements GrpcClientController<K, C> {
63
64 /**
65 * The default max inbound message size (MB).
66 */
67 private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
68 private static final int MEGABYTES = 1024 * 1024;
69 private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
70
71 private final Logger log = getLogger(getClass());
72 private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
73 private final Map<K, C> clients = Maps.newHashMap();
74 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
75 private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 private GrpcChannelController grpcChannelController;
79
80 @Activate
81 public void activate() {
82 log.info("Started");
83 }
84
85 @Deactivate
86 public void deactivate() {
87 clientKeys.keySet().forEach(this::removeClient);
88 clientKeys.clear();
89 clients.clear();
90 channelIds.clear();
91 log.info("Stopped");
92 }
93
94 @Override
95 public boolean createClient(K clientKey) {
96 checkNotNull(clientKey);
97 return withDeviceLock(() -> doCreateClient(clientKey), clientKey.deviceId());
98 }
99
100 private boolean doCreateClient(K clientKey) {
101 DeviceId deviceId = clientKey.deviceId();
102 String serverAddr = clientKey.serverAddr();
103 int serverPort = clientKey.serverPort();
104
105 if (clientKeys.containsKey(deviceId)) {
106 final GrpcClientKey existingKey = clientKeys.get(deviceId);
107 if (clientKey.equals(existingKey)) {
108 log.debug("Not creating client for {} as it already exists (key={})...",
109 deviceId, clientKey);
110 return true;
111 } else {
112 log.info("Requested client for {} with new " +
113 "endpoint, removing old client (key={})...",
114 deviceId, clientKey);
115 doRemoveClient(deviceId);
116 }
117 }
118 log.info("Creating client for {} (server={}:{})...",
119 deviceId, serverAddr, serverPort);
120 GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(), clientKey.toString());
121 ManagedChannelBuilder channelBuilder = NettyChannelBuilder
122 .forAddress(serverAddr, serverPort)
123 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES)
124 .usePlaintext();
125
126 ManagedChannel channel;
127 try {
128 channel = grpcChannelController.connectChannel(channelId, channelBuilder);
129 } catch (IOException e) {
130 log.warn("Unable to connect to gRPC server of {}: {}",
131 clientKey.deviceId(), e.getMessage());
132 return false;
133 }
134
135 C client = createClientInstance(clientKey, channel);
136 if (client == null) {
137 log.warn("Cannot create client for {} (key={})", deviceId, clientKey);
138 return false;
139 }
140 clientKeys.put(deviceId, clientKey);
141 clients.put(clientKey, client);
142 channelIds.put(deviceId, channelId);
143
144 return true;
145 }
146
147 protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
148
149 @Override
150 public C getClient(DeviceId deviceId) {
151 checkNotNull(deviceId);
152 return withDeviceLock(() -> doGetClient(deviceId), deviceId);
153 }
154
155 protected C doGetClient(DeviceId deviceId) {
156 if (!clientKeys.containsKey(deviceId)) {
157 return null;
158 }
159 return clients.get(clientKeys.get(deviceId));
160 }
161
162 @Override
163 public void removeClient(DeviceId deviceId) {
164 checkNotNull(deviceId);
165 withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
166 }
167
168 private Void doRemoveClient(DeviceId deviceId) {
169 if (clientKeys.containsKey(deviceId)) {
170 final K clientKey = clientKeys.get(deviceId);
171 clients.get(clientKey).shutdown();
172 grpcChannelController.disconnectChannel(channelIds.get(deviceId));
173 clientKeys.remove(deviceId);
174 clients.remove(clientKey);
175 channelIds.remove(deviceId);
176 }
177 return null;
178 }
179
180 @Override
181 public boolean isReachable(DeviceId deviceId) {
182 checkNotNull(deviceId);
183 return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
184 }
185
186 protected boolean doIsReachable(DeviceId deviceId) {
187 // Default behaviour checks only the gRPC channel, should
188 // check according to different gRPC service
189 if (!clientKeys.containsKey(deviceId)) {
190 log.debug("No client for {}, can't check for reachability", deviceId);
191 return false;
192 }
193 return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
194 }
195
196 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
197 final Lock lock = stripedLocks.get(deviceId);
198 lock.lock();
199 try {
200 return task.get();
201 } finally {
202 lock.unlock();
203 }
204 }
205}