blob: fb1e7ee5a408d2c9bd4bb3f9683d2f8d9a27a699 [file] [log] [blame]
Andrea Campanella378e21a2017-06-07 12:09:59 +02001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella378e21a2017-06-07 12:09:59 +02003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import com.google.common.collect.ImmutableSet;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import io.grpc.CallOptions;
21import io.grpc.Channel;
22import io.grpc.ClientCall;
23import io.grpc.ClientInterceptor;
24import io.grpc.ForwardingClientCall;
25import io.grpc.ForwardingClientCallListener;
Andrea Campanella378e21a2017-06-07 12:09:59 +020026import io.grpc.ManagedChannel;
27import io.grpc.ManagedChannelBuilder;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040028import io.grpc.Metadata;
29import io.grpc.MethodDescriptor;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040030import io.grpc.Status;
31import io.grpc.StatusRuntimeException;
Andrea Campanella378e21a2017-06-07 12:09:59 +020032import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Service;
36import org.onosproject.grpc.api.GrpcChannelId;
37import org.onosproject.grpc.api.GrpcController;
38import org.onosproject.grpc.api.GrpcObserverHandler;
39import org.onosproject.grpc.api.GrpcStreamObserverId;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040040import org.onosproject.grpc.ctl.dummy.Dummy;
41import org.onosproject.grpc.ctl.dummy.DummyServiceGrpc;
Andrea Campanella378e21a2017-06-07 12:09:59 +020042import org.onosproject.net.DeviceId;
43import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
Carmelo Cascone59f57de2017-07-11 19:55:09 -040046import java.io.IOException;
Andrea Campanella378e21a2017-06-07 12:09:59 +020047import java.util.Collection;
48import java.util.HashSet;
49import java.util.Map;
50import java.util.Optional;
51import java.util.Set;
52import java.util.concurrent.ConcurrentHashMap;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040053import java.util.concurrent.TimeUnit;
Andrea Campanella378e21a2017-06-07 12:09:59 +020054
55/**
56 * Default implementation of the GrpcController.
57 */
58@Component(immediate = true)
59@Service
60public class GrpcControllerImpl implements GrpcController {
61
Carmelo Cascone8d99b172017-07-18 17:26:31 -040062 // Hint: set to true to log all gRPC messages received/sent on all channels.
63 // TODO: make configurable at runtime
64 public static boolean enableMessageLog = false;
65
Carmelo Cascone59f57de2017-07-11 19:55:09 -040066 private static final int CONNECTION_TIMEOUT_SECONDS = 20;
67
Andrea Campanella378e21a2017-06-07 12:09:59 +020068 public static final Logger log = LoggerFactory
69 .getLogger(GrpcControllerImpl.class);
70
71 private Map<GrpcStreamObserverId, GrpcObserverHandler> observers;
72 private Map<GrpcChannelId, ManagedChannel> channels;
73 private Map<GrpcChannelId, ManagedChannelBuilder<?>> channelBuilders;
74
75 @Activate
76 public void activate() {
77 observers = new ConcurrentHashMap<>();
78 channels = new ConcurrentHashMap<>();
79 channelBuilders = new ConcurrentHashMap<>();
80 log.info("Started");
81 }
82
83 @Deactivate
84 public void deactivate() {
85 channels.values().forEach(ManagedChannel::shutdown);
86 observers.clear();
87 channels.clear();
88 channelBuilders.clear();
89 log.info("Stopped");
90 }
91
92 @Override
93 public void addObserver(GrpcStreamObserverId observerId, GrpcObserverHandler grpcObserverHandler) {
94 grpcObserverHandler.bindObserver(channels.get(observerId.serviceId().channelId()));
95 observers.put(observerId, grpcObserverHandler);
96 }
97
98 @Override
99 public void removeObserver(GrpcStreamObserverId observerId) {
100 observers.get(observerId).removeObserver();
101 observers.remove(observerId);
102 }
103
104 @Override
105 public Optional<GrpcObserverHandler> getObserverManager(GrpcStreamObserverId observerId) {
106 return Optional.ofNullable(observers.get(observerId));
107 }
108
109 @Override
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400110 public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder)
111 throws IOException {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400112
113 if (enableMessageLog) {
114 channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
115 }
116
Andrea Campanella378e21a2017-06-07 12:09:59 +0200117 ManagedChannel channel = channelBuilder.build();
118
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400119 // Forced connection not yet implemented. Use workaround...
120 // channel.getState(true);
121 doDummyMessage(channel);
122
Andrea Campanella378e21a2017-06-07 12:09:59 +0200123 channelBuilders.put(channelId, channelBuilder);
124 channels.put(channelId, channel);
125 return channel;
126 }
127
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400128 private void doDummyMessage(ManagedChannel channel) throws IOException {
129 DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc.newBlockingStub(channel)
130 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
131 try {
132 dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse.getDefaultInstance());
133 } catch (StatusRuntimeException e) {
134 if (e.getStatus() != Status.UNIMPLEMENTED) {
135 // UNIMPLEMENTED means that server received our message but doesn't know how to handle it.
136 // Hence, channel is open.
137 throw new IOException(e);
138 }
139 }
140 }
141
142 @Override
143 public boolean isChannelOpen(GrpcChannelId channelId) {
144 if (!channels.containsKey(channelId)) {
145 log.warn("Can't check if channel open for unknown channel id {}", channelId);
146 return false;
147 }
148
149 try {
150 doDummyMessage(channels.get(channelId));
151 return true;
152 } catch (IOException e) {
153 return false;
154 }
155 }
156
Andrea Campanella378e21a2017-06-07 12:09:59 +0200157 @Override
158 public void disconnectChannel(GrpcChannelId channelId) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400159 if (!channels.containsKey(channelId)) {
160 // Nothing to do.
161 return;
162 }
163 ManagedChannel channel = channels.get(channelId);
164
165 try {
166 channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
167 } catch (InterruptedException e) {
168 log.warn("Channel {} didn't shut down in time.");
169 channel.shutdownNow();
170 }
171
Andrea Campanella378e21a2017-06-07 12:09:59 +0200172 channels.remove(channelId);
173 channelBuilders.remove(channelId);
174 }
175
176 @Override
177 public Map<GrpcChannelId, ManagedChannel> getChannels() {
178 return channels;
179 }
180
181 @Override
182 public Collection<ManagedChannel> getChannels(final DeviceId deviceId) {
183 final Set<ManagedChannel> deviceChannels = new HashSet<>();
184 channels.forEach((k, v) -> {
185 if (k.deviceId().equals(deviceId)) {
186 deviceChannels.add(v);
187 }
188 });
189
190 return ImmutableSet.copyOf(deviceChannels);
191 }
192
193 @Override
194 public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
195 return Optional.ofNullable(channels.get(channelId));
196 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400197
198 /**
199 * gRPC client interceptor that logs all messages sent and received.
200 */
201 private final class InternalLogChannelInterceptor implements ClientInterceptor {
202
203 private final GrpcChannelId channelId;
204
205 private InternalLogChannelInterceptor(GrpcChannelId channelId) {
206 this.channelId = channelId;
207 }
208
209 @Override
210 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
211 CallOptions callOptions, Channel channel) {
212 return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(
213 methodDescriptor, callOptions.withoutWaitForReady())) {
214
215 @Override
216 public void sendMessage(ReqT message) {
217 log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}", channelId, methodDescriptor.getFullMethodName(),
218 message.toString());
219 super.sendMessage(message);
220 }
221
222 @Override
223 public void start(Listener<RespT> responseListener, Metadata headers) {
224
225 ClientCall.Listener<RespT> listener = new ForwardingClientCallListener<RespT>() {
226 @Override
227 protected Listener<RespT> delegate() {
228 return responseListener;
229 }
230
231 @Override
232 public void onMessage(RespT message) {
233 log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}", channelId,
234 methodDescriptor.getFullMethodName(),
235 message.toString());
236 super.onMessage(message);
237 }
238 };
239 super.start(listener, headers);
240 }
241 };
242 }
243 }
Andrea Campanella378e21a2017-06-07 12:09:59 +0200244}