blob: ca0c8c37b5191b89e717abd635f7f0aa2b51cd23 [file] [log] [blame]
Andrea Campanella378e21a2017-06-07 12:09:59 +02001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella378e21a2017-06-07 12:09:59 +02003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
Carmelo Casconefb924072017-08-29 20:21:55 +020019import com.google.common.collect.ImmutableMap;
Carmelo Cascone158b8c42018-07-04 19:42:37 +020020import com.google.common.util.concurrent.Striped;
Andrea Campanella378e21a2017-06-07 12:09:59 +020021import io.grpc.ManagedChannel;
22import io.grpc.ManagedChannelBuilder;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040023import io.grpc.Status;
24import io.grpc.StatusRuntimeException;
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020025import org.onlab.util.Tools;
26import org.onosproject.cfg.ComponentConfigService;
Yi Tseng2a340f72018-11-02 16:52:47 -070027import org.onosproject.grpc.api.GrpcChannelController;
Andrea Campanella378e21a2017-06-07 12:09:59 +020028import org.onosproject.grpc.api.GrpcChannelId;
Carmelo Cascone6a1ae712018-08-10 12:19:47 -070029import org.onosproject.grpc.proto.dummy.Dummy;
30import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020031import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070032import org.osgi.service.component.annotations.Activate;
33import org.osgi.service.component.annotations.Component;
34import org.osgi.service.component.annotations.Deactivate;
35import org.osgi.service.component.annotations.Modified;
36import org.osgi.service.component.annotations.Reference;
37import org.osgi.service.component.annotations.ReferenceCardinality;
Andrea Campanella378e21a2017-06-07 12:09:59 +020038import org.slf4j.Logger;
39import org.slf4j.LoggerFactory;
40
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020041import java.util.Dictionary;
Andrea Campanella378e21a2017-06-07 12:09:59 +020042import java.util.Map;
43import java.util.Optional;
Andrea Campanella378e21a2017-06-07 12:09:59 +020044import java.util.concurrent.ConcurrentHashMap;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040045import java.util.concurrent.TimeUnit;
Carmelo Cascone73f45302019-02-04 23:11:26 -080046import java.util.concurrent.atomic.AtomicBoolean;
Carmelo Casconefb924072017-08-29 20:21:55 +020047import java.util.concurrent.locks.Lock;
Carmelo Casconefb924072017-08-29 20:21:55 +020048
49import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone6d57f322018-12-13 23:15:17 -080050import static java.lang.String.format;
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070051import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG;
52import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_DEFAULT;
Andrea Campanella378e21a2017-06-07 12:09:59 +020053
54/**
Yi Tseng2a340f72018-11-02 16:52:47 -070055 * Default implementation of the GrpcChannelController.
Andrea Campanella378e21a2017-06-07 12:09:59 +020056 */
Ray Milkey5739b2c2018-11-06 14:04:51 -080057@Component(immediate = true, service = GrpcChannelController.class,
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070058 property = {
Carmelo Cascone73f45302019-02-04 23:11:26 -080059 ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT,
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070060 })
Yi Tseng2a340f72018-11-02 16:52:47 -070061public class GrpcChannelControllerImpl implements GrpcChannelController {
Andrea Campanella378e21a2017-06-07 12:09:59 +020062
Ray Milkeyd84f89b2018-08-17 14:54:17 -070063 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020064 protected ComponentConfigService componentConfigService;
65
Carmelo Cascone73f45302019-02-04 23:11:26 -080066 /**
67 * Indicates whether to log gRPC messages.
68 */
69 private final AtomicBoolean enableMessageLog = new AtomicBoolean(
70 ENABLE_MESSAGE_LOG_DEFAULT);
Carmelo Cascone8d99b172017-07-18 17:26:31 -040071
Carmelo Cascone47a853b2018-01-05 02:40:58 +010072 private final Logger log = LoggerFactory.getLogger(getClass());
Carmelo Cascone59f57de2017-07-11 19:55:09 -040073
Andrea Campanella378e21a2017-06-07 12:09:59 +020074 private Map<GrpcChannelId, ManagedChannel> channels;
Carmelo Cascone73f45302019-02-04 23:11:26 -080075 private Map<GrpcChannelId, GrpcLoggingInterceptor> interceptors;
76
Carmelo Cascone158b8c42018-07-04 19:42:37 +020077 private final Striped<Lock> channelLocks = Striped.lock(30);
Andrea Campanella378e21a2017-06-07 12:09:59 +020078
79 @Activate
80 public void activate() {
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020081 componentConfigService.registerProperties(getClass());
Andrea Campanella378e21a2017-06-07 12:09:59 +020082 channels = new ConcurrentHashMap<>();
Carmelo Cascone73f45302019-02-04 23:11:26 -080083 interceptors = new ConcurrentHashMap<>();
Andrea Campanella378e21a2017-06-07 12:09:59 +020084 log.info("Started");
85 }
86
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020087 @Modified
88 public void modified(ComponentContext context) {
89 if (context != null) {
90 Dictionary<?, ?> properties = context.getProperties();
Carmelo Cascone73f45302019-02-04 23:11:26 -080091 enableMessageLog.set(Tools.isPropertyEnabled(
92 properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT));
93 log.info("Configured. Logging of gRPC messages is {}",
94 enableMessageLog.get()
95 ? "ENABLED for new channels"
96 : "DISABLED for new and existing channels");
Andrea Campanellaa74bdba2018-05-15 16:45:00 +020097 }
98 }
99
Andrea Campanella378e21a2017-06-07 12:09:59 +0200100 @Deactivate
101 public void deactivate() {
Andrea Campanellaa74bdba2018-05-15 16:45:00 +0200102 componentConfigService.unregisterProperties(getClass(), false);
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800103 channels.values().forEach(ManagedChannel::shutdownNow);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200104 channels.clear();
Carmelo Cascone73f45302019-02-04 23:11:26 -0800105 channels = null;
106 interceptors.values().forEach(GrpcLoggingInterceptor::close);
107 interceptors.clear();
108 interceptors = null;
Andrea Campanella378e21a2017-06-07 12:09:59 +0200109 log.info("Stopped");
110 }
111
112 @Override
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100113 public ManagedChannel connectChannel(GrpcChannelId channelId,
Carmelo Casconea71b8492018-12-17 17:47:50 -0800114 ManagedChannelBuilder<?> channelBuilder) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200115 checkNotNull(channelId);
116 checkNotNull(channelBuilder);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400117
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200118 Lock lock = channelLocks.get(channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200119 lock.lock();
120
121 try {
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800122 if (channels.containsKey(channelId)) {
123 throw new IllegalArgumentException(format(
124 "A channel with ID '%s' already exists", channelId));
125 }
Carmelo Cascone73f45302019-02-04 23:11:26 -0800126
127 GrpcLoggingInterceptor interceptor = null;
128 if (enableMessageLog.get()) {
129 interceptor = new GrpcLoggingInterceptor(channelId, enableMessageLog);
130 channelBuilder.intercept(interceptor);
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800131 }
Carmelo Casconefb924072017-08-29 20:21:55 +0200132 ManagedChannel channel = channelBuilder.build();
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800133 // Forced connection API is still experimental. Use workaround...
Carmelo Casconefb924072017-08-29 20:21:55 +0200134 // channel.getState(true);
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800135 try {
136 doDummyMessage(channel);
137 } catch (StatusRuntimeException e) {
Carmelo Cascone73f45302019-02-04 23:11:26 -0800138 if (interceptor != null) {
139 interceptor.close();
140 }
Carmelo Casconea71b8492018-12-17 17:47:50 -0800141 shutdownNowAndWait(channel, channelId);
142 throw e;
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800143 }
144 // If here, channel is open.
Carmelo Casconefb924072017-08-29 20:21:55 +0200145 channels.put(channelId, channel);
Carmelo Cascone73f45302019-02-04 23:11:26 -0800146 if (interceptor != null) {
147 interceptors.put(channelId, interceptor);
148 }
Carmelo Casconefb924072017-08-29 20:21:55 +0200149 return channel;
150 } finally {
151 lock.unlock();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400152 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200153 }
154
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800155 private boolean doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100156 DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
157 .newBlockingStub(channel)
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400158 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
159 try {
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800160 return dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
161 .getDefaultInstance()) != null;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400162 } catch (StatusRuntimeException e) {
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800163 if (e.getStatus().equals(Status.UNIMPLEMENTED)) {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100164 // UNIMPLEMENTED means that the server received our message but
165 // doesn't know how to handle it. Hence, channel is open.
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800166 return true;
167 } else {
168 throw e;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400169 }
170 }
171 }
172
173 @Override
174 public boolean isChannelOpen(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200175 checkNotNull(channelId);
176
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200177 Lock lock = channelLocks.get(channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200178 lock.lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400179
180 try {
Carmelo Casconefb924072017-08-29 20:21:55 +0200181 if (!channels.containsKey(channelId)) {
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800182 log.warn("Unknown channel ID '{}', can't check if channel is open",
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100183 channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200184 return false;
185 }
186 try {
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800187 return doDummyMessage(channels.get(channelId));
188 } catch (StatusRuntimeException e) {
Carmelo Cascone47a853b2018-01-05 02:40:58 +0100189 log.debug("Unable to send dummy message to {}: {}",
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800190 channelId, e.toString());
Carmelo Casconefb924072017-08-29 20:21:55 +0200191 return false;
192 }
193 } finally {
194 lock.unlock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400195 }
196 }
197
Andrea Campanella378e21a2017-06-07 12:09:59 +0200198 @Override
199 public void disconnectChannel(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200200 checkNotNull(channelId);
201
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200202 Lock lock = channelLocks.get(channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200203 lock.lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400204 try {
Carmelo Cascone6d57f322018-12-13 23:15:17 -0800205 final ManagedChannel channel = channels.remove(channelId);
Carmelo Casconea71b8492018-12-17 17:47:50 -0800206 if (channel != null) {
207 shutdownNowAndWait(channel, channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200208 }
Carmelo Cascone73f45302019-02-04 23:11:26 -0800209 final GrpcLoggingInterceptor interceptor = interceptors.remove(channelId);
210 if (interceptor != null) {
211 interceptor.close();
212 }
Carmelo Casconefb924072017-08-29 20:21:55 +0200213 } finally {
214 lock.unlock();
215 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200216 }
217
Carmelo Casconea71b8492018-12-17 17:47:50 -0800218 private void shutdownNowAndWait(ManagedChannel channel, GrpcChannelId channelId) {
219 try {
220 if (!channel.shutdownNow()
221 .awaitTermination(5, TimeUnit.SECONDS)) {
222 log.error("Channel '{}' didn't terminate, although we " +
223 "triggered a shutdown and waited",
224 channelId);
225 }
226 } catch (InterruptedException e) {
227 log.warn("Channel {} didn't shutdown in time", channelId);
228 Thread.currentThread().interrupt();
229 }
230 }
231
Andrea Campanella378e21a2017-06-07 12:09:59 +0200232 @Override
233 public Map<GrpcChannelId, ManagedChannel> getChannels() {
Carmelo Casconefb924072017-08-29 20:21:55 +0200234 return ImmutableMap.copyOf(channels);
Andrea Campanella378e21a2017-06-07 12:09:59 +0200235 }
236
237 @Override
Andrea Campanella378e21a2017-06-07 12:09:59 +0200238 public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
Carmelo Casconefb924072017-08-29 20:21:55 +0200239 checkNotNull(channelId);
240
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200241 Lock lock = channelLocks.get(channelId);
Carmelo Casconefb924072017-08-29 20:21:55 +0200242 lock.lock();
243
244 try {
245 return Optional.ofNullable(channels.get(channelId));
246 } finally {
247 lock.unlock();
248 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200249 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400250
Andrea Campanella378e21a2017-06-07 12:09:59 +0200251}