blob: 87c9426a5916ed7e794bfa6f4cd916f84127057d [file] [log] [blame]
Andrea Campanella378e21a2017-06-07 12:09:59 +02001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella378e21a2017-06-07 12:09:59 +02003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
Carmelo Casconefb924072017-08-29 20:21:55 +020019import com.google.common.collect.ImmutableMap;
Carmelo Cascone158b8c42018-07-04 19:42:37 +020020import com.google.common.util.concurrent.Striped;
Andrea Campanella378e21a2017-06-07 12:09:59 +020021import io.grpc.ManagedChannel;
22import io.grpc.ManagedChannelBuilder;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040023import io.grpc.Status;
24import io.grpc.StatusRuntimeException;
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020025import org.onlab.util.Tools;
26import org.onosproject.cfg.ComponentConfigService;
Yi Tseng2a340f72018-11-02 16:52:47 -070027import org.onosproject.grpc.api.GrpcChannelController;
Andrea Campanella378e21a2017-06-07 12:09:59 +020028import org.onosproject.grpc.api.GrpcChannelId;
Carmelo Cascone6a1ae712018-08-10 12:19:47 -070029import org.onosproject.grpc.proto.dummy.Dummy;
30import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020031import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070032import org.osgi.service.component.annotations.Activate;
33import org.osgi.service.component.annotations.Component;
34import org.osgi.service.component.annotations.Deactivate;
35import org.osgi.service.component.annotations.Modified;
36import org.osgi.service.component.annotations.Reference;
37import org.osgi.service.component.annotations.ReferenceCardinality;
Andrea Campanella378e21a2017-06-07 12:09:59 +020038import org.slf4j.Logger;
39import org.slf4j.LoggerFactory;
40
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020041import java.util.Dictionary;
Andrea Campanella378e21a2017-06-07 12:09:59 +020042import java.util.Map;
43import java.util.Optional;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080044import java.util.concurrent.CompletableFuture;
Andrea Campanella378e21a2017-06-07 12:09:59 +020045import java.util.concurrent.ConcurrentHashMap;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040046import java.util.concurrent.TimeUnit;
Carmelo Cascone73f45302019-02-04 23:11:26 -080047import java.util.concurrent.atomic.AtomicBoolean;
Carmelo Casconefb924072017-08-29 20:21:55 +020048import java.util.concurrent.locks.Lock;
Carmelo Casconefb924072017-08-29 20:21:55 +020049
50import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone6d57f322018-12-13 23:15:17 -080051import static java.lang.String.format;
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070052import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG;
53import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_DEFAULT;
Andrea Campanella378e21a2017-06-07 12:09:59 +020054
55/**
Yi Tseng2a340f72018-11-02 16:52:47 -070056 * Default implementation of the GrpcChannelController.
Andrea Campanella378e21a2017-06-07 12:09:59 +020057 */
Ray Milkey5739b2c2018-11-06 14:04:51 -080058@Component(immediate = true, service = GrpcChannelController.class,
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070059 property = {
Carmelo Cascone73f45302019-02-04 23:11:26 -080060 ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT,
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070061 })
Yi Tseng2a340f72018-11-02 16:52:47 -070062public class GrpcChannelControllerImpl implements GrpcChannelController {
Andrea Campanella378e21a2017-06-07 12:09:59 +020063
Ray Milkeyd84f89b2018-08-17 14:54:17 -070064 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020065 protected ComponentConfigService componentConfigService;
66
Carmelo Cascone73f45302019-02-04 23:11:26 -080067 /**
68 * Indicates whether to log gRPC messages.
69 */
70 private final AtomicBoolean enableMessageLog = new AtomicBoolean(
71 ENABLE_MESSAGE_LOG_DEFAULT);
Carmelo Cascone8d99b172017-07-18 17:26:31 -040072
Carmelo Cascone47a853b2018-01-05 02:40:58 +010073 private final Logger log = LoggerFactory.getLogger(getClass());
Carmelo Cascone59f57de2017-07-11 19:55:09 -040074
Andrea Campanella378e21a2017-06-07 12:09:59 +020075 private Map<GrpcChannelId, ManagedChannel> channels;
Carmelo Cascone73f45302019-02-04 23:11:26 -080076 private Map<GrpcChannelId, GrpcLoggingInterceptor> interceptors;
77
Carmelo Cascone158b8c42018-07-04 19:42:37 +020078 private final Striped<Lock> channelLocks = Striped.lock(30);
Andrea Campanella378e21a2017-06-07 12:09:59 +020079
80 @Activate
81 public void activate() {
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020082 componentConfigService.registerProperties(getClass());
Andrea Campanella378e21a2017-06-07 12:09:59 +020083 channels = new ConcurrentHashMap<>();
Carmelo Cascone73f45302019-02-04 23:11:26 -080084 interceptors = new ConcurrentHashMap<>();
Andrea Campanella378e21a2017-06-07 12:09:59 +020085 log.info("Started");
86 }
87
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020088 @Modified
89 public void modified(ComponentContext context) {
90 if (context != null) {
91 Dictionary<?, ?> properties = context.getProperties();
Carmelo Cascone73f45302019-02-04 23:11:26 -080092 enableMessageLog.set(Tools.isPropertyEnabled(
93 properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT));
94 log.info("Configured. Logging of gRPC messages is {}",
95 enableMessageLog.get()
96 ? "ENABLED for new channels"
97 : "DISABLED for new and existing channels");
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020098 }
99 }
100
Andrea Campanella378e21a2017-06-07 12:09:59 +0200101 @Deactivate
102 public void deactivate() {
Andrea Campanellaa74bdba2018-05-15 16:45:00 +0200103 componentConfigService.unregisterProperties(getClass(), false);
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800104 channels.values().forEach(ManagedChannel::shutdownNow);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200105 channels.clear();
Carmelo Cascone73f45302019-02-04 23:11:26 -0800106 channels = null;
107 interceptors.values().forEach(GrpcLoggingInterceptor::close);
108 interceptors.clear();
109 interceptors = null;
Andrea Campanella378e21a2017-06-07 12:09:59 +0200110 log.info("Stopped");
111 }
112
113 @Override
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100114 public ManagedChannel connectChannel(GrpcChannelId channelId,
Carmelo Casconea71b8492018-12-17 17:47:50 -0800115 ManagedChannelBuilder<?> channelBuilder) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200116 checkNotNull(channelId);
117 checkNotNull(channelBuilder);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400118
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200119 Lock lock = channelLocks.get(channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200120 lock.lock();
121
122 try {
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800123 if (channels.containsKey(channelId)) {
124 throw new IllegalArgumentException(format(
125 "A channel with ID '%s' already exists", channelId));
126 }
Carmelo Cascone73f45302019-02-04 23:11:26 -0800127
128 GrpcLoggingInterceptor interceptor = null;
129 if (enableMessageLog.get()) {
130 interceptor = new GrpcLoggingInterceptor(channelId, enableMessageLog);
131 channelBuilder.intercept(interceptor);
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800132 }
Carmelo Casconefb924072017-08-29 20:21:55 +0200133 ManagedChannel channel = channelBuilder.build();
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800134 // Forced connection API is still experimental. Use workaround...
Carmelo Casconefb924072017-08-29 20:21:55 +0200135 // channel.getState(true);
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800136 try {
137 doDummyMessage(channel);
138 } catch (StatusRuntimeException e) {
Carmelo Cascone73f45302019-02-04 23:11:26 -0800139 if (interceptor != null) {
140 interceptor.close();
141 }
Carmelo Casconea71b8492018-12-17 17:47:50 -0800142 shutdownNowAndWait(channel, channelId);
143 throw e;
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800144 }
145 // If here, channel is open.
Carmelo Casconefb924072017-08-29 20:21:55 +0200146 channels.put(channelId, channel);
Carmelo Cascone73f45302019-02-04 23:11:26 -0800147 if (interceptor != null) {
148 interceptors.put(channelId, interceptor);
149 }
Carmelo Casconefb924072017-08-29 20:21:55 +0200150 return channel;
151 } finally {
152 lock.unlock();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400153 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200154 }
155
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800156 private void doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100157 DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
158 .newBlockingStub(channel)
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400159 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
160 try {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800161 //noinspection ResultOfMethodCallIgnored
162 dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
163 .getDefaultInstance());
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400164 } catch (StatusRuntimeException e) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800165 if (!e.getStatus().equals(Status.UNIMPLEMENTED)) {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100166 // UNIMPLEMENTED means that the server received our message but
167 // doesn't know how to handle it. Hence, channel is open.
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800168 throw e;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400169 }
170 }
171 }
172
173 @Override
Andrea Campanella378e21a2017-06-07 12:09:59 +0200174 public void disconnectChannel(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200175 checkNotNull(channelId);
176
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200177 Lock lock = channelLocks.get(channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200178 lock.lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400179 try {
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800180 final ManagedChannel channel = channels.remove(channelId);
Carmelo Casconea71b8492018-12-17 17:47:50 -0800181 if (channel != null) {
182 shutdownNowAndWait(channel, channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200183 }
Carmelo Cascone73f45302019-02-04 23:11:26 -0800184 final GrpcLoggingInterceptor interceptor = interceptors.remove(channelId);
185 if (interceptor != null) {
186 interceptor.close();
187 }
Carmelo Casconefb924072017-08-29 20:21:55 +0200188 } finally {
189 lock.unlock();
190 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200191 }
192
Carmelo Casconea71b8492018-12-17 17:47:50 -0800193 private void shutdownNowAndWait(ManagedChannel channel, GrpcChannelId channelId) {
194 try {
195 if (!channel.shutdownNow()
196 .awaitTermination(5, TimeUnit.SECONDS)) {
197 log.error("Channel '{}' didn't terminate, although we " +
198 "triggered a shutdown and waited",
199 channelId);
200 }
201 } catch (InterruptedException e) {
202 log.warn("Channel {} didn't shutdown in time", channelId);
203 Thread.currentThread().interrupt();
204 }
205 }
206
Andrea Campanella378e21a2017-06-07 12:09:59 +0200207 @Override
208 public Map<GrpcChannelId, ManagedChannel> getChannels() {
Carmelo Casconefb924072017-08-29 20:21:55 +0200209 return ImmutableMap.copyOf(channels);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200210 }
211
212 @Override
Andrea Campanella378e21a2017-06-07 12:09:59 +0200213 public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200214 checkNotNull(channelId);
215
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200216 Lock lock = channelLocks.get(channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200217 lock.lock();
Carmelo Casconefb924072017-08-29 20:21:55 +0200218 try {
219 return Optional.ofNullable(channels.get(channelId));
220 } finally {
221 lock.unlock();
222 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200223 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400224
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800225 @Override
226 public CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId) {
227 final ManagedChannel channel = channels.get(channelId);
228 if (channel == null) {
229 log.warn("Unable to find any channel with ID {}, cannot send probe",
230 channelId);
231 return CompletableFuture.completedFuture(false);
232 }
233 return CompletableFuture.supplyAsync(() -> {
234 try {
235 doDummyMessage(channel);
236 return true;
237 } catch (StatusRuntimeException e) {
238 log.debug("Probe for {} failed", channelId);
239 log.debug("", e);
240 return false;
241 }
242 });
243 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200244}