blob: 96a1671ec7f1d42d6dfe9070541e705a9a928303 [file] [log] [blame]
Andrea Campanella378e21a2017-06-07 12:09:59 +02001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella378e21a2017-06-07 12:09:59 +02003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
Carmelo Casconefb924072017-08-29 20:21:55 +020019import com.google.common.collect.ImmutableMap;
Carmelo Cascone158b8c42018-07-04 19:42:37 +020020import com.google.common.util.concurrent.Striped;
Andrea Campanella378e21a2017-06-07 12:09:59 +020021import io.grpc.ManagedChannel;
22import io.grpc.ManagedChannelBuilder;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040023import io.grpc.Status;
24import io.grpc.StatusRuntimeException;
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020025import org.onlab.util.Tools;
26import org.onosproject.cfg.ComponentConfigService;
Yi Tseng2a340f72018-11-02 16:52:47 -070027import org.onosproject.grpc.api.GrpcChannelController;
Andrea Campanella378e21a2017-06-07 12:09:59 +020028import org.onosproject.grpc.api.GrpcChannelId;
Carmelo Cascone6a1ae712018-08-10 12:19:47 -070029import org.onosproject.grpc.proto.dummy.Dummy;
30import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020031import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070032import org.osgi.service.component.annotations.Activate;
33import org.osgi.service.component.annotations.Component;
34import org.osgi.service.component.annotations.Deactivate;
35import org.osgi.service.component.annotations.Modified;
36import org.osgi.service.component.annotations.Reference;
37import org.osgi.service.component.annotations.ReferenceCardinality;
Andrea Campanella378e21a2017-06-07 12:09:59 +020038import org.slf4j.Logger;
39import org.slf4j.LoggerFactory;
40
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020041import java.util.Dictionary;
Andrea Campanella378e21a2017-06-07 12:09:59 +020042import java.util.Map;
43import java.util.Optional;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080044import java.util.concurrent.CompletableFuture;
Andrea Campanella378e21a2017-06-07 12:09:59 +020045import java.util.concurrent.ConcurrentHashMap;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040046import java.util.concurrent.TimeUnit;
Carmelo Cascone73f45302019-02-04 23:11:26 -080047import java.util.concurrent.atomic.AtomicBoolean;
Carmelo Casconefb924072017-08-29 20:21:55 +020048import java.util.concurrent.locks.Lock;
Carmelo Casconefb924072017-08-29 20:21:55 +020049
50import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone6d57f322018-12-13 23:15:17 -080051import static java.lang.String.format;
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070052import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG;
53import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_DEFAULT;
Andrea Campanella378e21a2017-06-07 12:09:59 +020054
55/**
Yi Tseng2a340f72018-11-02 16:52:47 -070056 * Default implementation of the GrpcChannelController.
Andrea Campanella378e21a2017-06-07 12:09:59 +020057 */
Ray Milkey5739b2c2018-11-06 14:04:51 -080058@Component(immediate = true, service = GrpcChannelController.class,
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070059 property = {
Carmelo Cascone73f45302019-02-04 23:11:26 -080060 ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT,
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070061 })
Yi Tseng2a340f72018-11-02 16:52:47 -070062public class GrpcChannelControllerImpl implements GrpcChannelController {
Andrea Campanella378e21a2017-06-07 12:09:59 +020063
Ray Milkeyd84f89b2018-08-17 14:54:17 -070064 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020065 protected ComponentConfigService componentConfigService;
66
Carmelo Cascone73f45302019-02-04 23:11:26 -080067 /**
68 * Indicates whether to log gRPC messages.
69 */
70 private final AtomicBoolean enableMessageLog = new AtomicBoolean(
71 ENABLE_MESSAGE_LOG_DEFAULT);
Carmelo Cascone8d99b172017-07-18 17:26:31 -040072
Carmelo Cascone47a853b2018-01-05 02:40:58 +010073 private final Logger log = LoggerFactory.getLogger(getClass());
Carmelo Cascone59f57de2017-07-11 19:55:09 -040074
Andrea Campanella378e21a2017-06-07 12:09:59 +020075 private Map<GrpcChannelId, ManagedChannel> channels;
Carmelo Cascone73f45302019-02-04 23:11:26 -080076 private Map<GrpcChannelId, GrpcLoggingInterceptor> interceptors;
77
Carmelo Cascone158b8c42018-07-04 19:42:37 +020078 private final Striped<Lock> channelLocks = Striped.lock(30);
Andrea Campanella378e21a2017-06-07 12:09:59 +020079
80 @Activate
81 public void activate() {
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020082 componentConfigService.registerProperties(getClass());
Andrea Campanella378e21a2017-06-07 12:09:59 +020083 channels = new ConcurrentHashMap<>();
Carmelo Cascone73f45302019-02-04 23:11:26 -080084 interceptors = new ConcurrentHashMap<>();
Andrea Campanella378e21a2017-06-07 12:09:59 +020085 log.info("Started");
86 }
87
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020088 @Modified
89 public void modified(ComponentContext context) {
90 if (context != null) {
91 Dictionary<?, ?> properties = context.getProperties();
Carmelo Cascone73f45302019-02-04 23:11:26 -080092 enableMessageLog.set(Tools.isPropertyEnabled(
93 properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT));
94 log.info("Configured. Logging of gRPC messages is {}",
Carmelo Cascone62d5c2e2019-03-07 18:53:17 -080095 enableMessageLog.get() ? "ENABLED" : "DISABLED");
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020096 }
97 }
98
Andrea Campanella378e21a2017-06-07 12:09:59 +020099 @Deactivate
100 public void deactivate() {
Andrea Campanellaa74bdba2018-05-15 16:45:00 +0200101 componentConfigService.unregisterProperties(getClass(), false);
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800102 channels.values().forEach(ManagedChannel::shutdownNow);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200103 channels.clear();
Carmelo Cascone73f45302019-02-04 23:11:26 -0800104 channels = null;
105 interceptors.values().forEach(GrpcLoggingInterceptor::close);
106 interceptors.clear();
107 interceptors = null;
Andrea Campanella378e21a2017-06-07 12:09:59 +0200108 log.info("Stopped");
109 }
110
111 @Override
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100112 public ManagedChannel connectChannel(GrpcChannelId channelId,
Carmelo Casconea71b8492018-12-17 17:47:50 -0800113 ManagedChannelBuilder<?> channelBuilder) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200114 checkNotNull(channelId);
115 checkNotNull(channelBuilder);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400116
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200117 Lock lock = channelLocks.get(channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200118 lock.lock();
119
120 try {
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800121 if (channels.containsKey(channelId)) {
122 throw new IllegalArgumentException(format(
123 "A channel with ID '%s' already exists", channelId));
124 }
Carmelo Cascone73f45302019-02-04 23:11:26 -0800125
Carmelo Cascone62d5c2e2019-03-07 18:53:17 -0800126 final GrpcLoggingInterceptor interceptor = new GrpcLoggingInterceptor(
127 channelId, enableMessageLog);
128 channelBuilder.intercept(interceptor);
129
Carmelo Casconefb924072017-08-29 20:21:55 +0200130 ManagedChannel channel = channelBuilder.build();
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800131 // Forced connection API is still experimental. Use workaround...
Carmelo Casconefb924072017-08-29 20:21:55 +0200132 // channel.getState(true);
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800133 try {
134 doDummyMessage(channel);
135 } catch (StatusRuntimeException e) {
Carmelo Cascone62d5c2e2019-03-07 18:53:17 -0800136 interceptor.close();
Carmelo Casconea71b8492018-12-17 17:47:50 -0800137 shutdownNowAndWait(channel, channelId);
138 throw e;
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800139 }
140 // If here, channel is open.
Carmelo Casconefb924072017-08-29 20:21:55 +0200141 channels.put(channelId, channel);
Carmelo Cascone62d5c2e2019-03-07 18:53:17 -0800142 interceptors.put(channelId, interceptor);
Carmelo Casconefb924072017-08-29 20:21:55 +0200143 return channel;
144 } finally {
145 lock.unlock();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400146 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200147 }
148
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800149 private void doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100150 DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
151 .newBlockingStub(channel)
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400152 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
153 try {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800154 //noinspection ResultOfMethodCallIgnored
155 dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
156 .getDefaultInstance());
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400157 } catch (StatusRuntimeException e) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800158 if (!e.getStatus().equals(Status.UNIMPLEMENTED)) {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100159 // UNIMPLEMENTED means that the server received our message but
160 // doesn't know how to handle it. Hence, channel is open.
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800161 throw e;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400162 }
163 }
164 }
165
166 @Override
Andrea Campanella378e21a2017-06-07 12:09:59 +0200167 public void disconnectChannel(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200168 checkNotNull(channelId);
169
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200170 Lock lock = channelLocks.get(channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200171 lock.lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400172 try {
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800173 final ManagedChannel channel = channels.remove(channelId);
Carmelo Casconea71b8492018-12-17 17:47:50 -0800174 if (channel != null) {
175 shutdownNowAndWait(channel, channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200176 }
Carmelo Cascone73f45302019-02-04 23:11:26 -0800177 final GrpcLoggingInterceptor interceptor = interceptors.remove(channelId);
178 if (interceptor != null) {
179 interceptor.close();
180 }
Carmelo Casconefb924072017-08-29 20:21:55 +0200181 } finally {
182 lock.unlock();
183 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200184 }
185
Carmelo Casconea71b8492018-12-17 17:47:50 -0800186 private void shutdownNowAndWait(ManagedChannel channel, GrpcChannelId channelId) {
187 try {
188 if (!channel.shutdownNow()
189 .awaitTermination(5, TimeUnit.SECONDS)) {
190 log.error("Channel '{}' didn't terminate, although we " +
191 "triggered a shutdown and waited",
192 channelId);
193 }
194 } catch (InterruptedException e) {
195 log.warn("Channel {} didn't shutdown in time", channelId);
196 Thread.currentThread().interrupt();
197 }
198 }
199
Andrea Campanella378e21a2017-06-07 12:09:59 +0200200 @Override
201 public Map<GrpcChannelId, ManagedChannel> getChannels() {
Carmelo Casconefb924072017-08-29 20:21:55 +0200202 return ImmutableMap.copyOf(channels);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200203 }
204
205 @Override
Andrea Campanella378e21a2017-06-07 12:09:59 +0200206 public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200207 checkNotNull(channelId);
208
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200209 Lock lock = channelLocks.get(channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200210 lock.lock();
Carmelo Casconefb924072017-08-29 20:21:55 +0200211 try {
212 return Optional.ofNullable(channels.get(channelId));
213 } finally {
214 lock.unlock();
215 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200216 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400217
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800218 @Override
219 public CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId) {
220 final ManagedChannel channel = channels.get(channelId);
221 if (channel == null) {
222 log.warn("Unable to find any channel with ID {}, cannot send probe",
223 channelId);
224 return CompletableFuture.completedFuture(false);
225 }
226 return CompletableFuture.supplyAsync(() -> {
227 try {
228 doDummyMessage(channel);
229 return true;
230 } catch (StatusRuntimeException e) {
231 log.debug("Probe for {} failed", channelId);
232 log.debug("", e);
233 return false;
234 }
235 });
236 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200237}