blob: 5438c674fb021a24c072eca42447b45bf0c5fe38 [file] [log] [blame]
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.rpc.grpc;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static java.util.stream.Collectors.toList;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080020import static org.onosproject.net.DeviceId.deviceId;
21
22import java.util.Collection;
23import java.util.List;
24import java.util.concurrent.atomic.AtomicBoolean;
25
HIGUCHI Yutae3e90632016-05-11 16:44:01 -070026import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc;
27import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpcStub;
28import org.onosproject.grpc.net.device.DeviceService.DeviceProviderMsg;
29import org.onosproject.grpc.net.device.DeviceService.DeviceProviderServiceMsg;
30import org.onosproject.grpc.net.device.DeviceService.IsReachableRequest;
31import org.onosproject.grpc.net.device.DeviceService.RoleChanged;
32import org.onosproject.grpc.net.device.DeviceService.TriggerProbe;
Jian Lic9b4bf12017-06-26 23:50:32 +090033import org.onosproject.grpc.net.device.models.PortDescriptionProtoOuterClass.PortDescriptionProto;
34import org.onosproject.grpc.net.device.models.PortStatisticsProtoOuterClass.PortStatisticsProto;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080035import org.onosproject.net.DeviceId;
36import org.onosproject.net.MastershipRole;
37import org.onosproject.net.device.DeviceDescription;
38import org.onosproject.net.device.DeviceProvider;
39import org.onosproject.net.device.DeviceProviderService;
40import org.onosproject.net.device.PortDescription;
41import org.onosproject.net.device.PortStatistics;
42import org.onosproject.net.provider.AbstractProviderService;
Jian Lid40252c2017-12-08 04:13:20 +090043import org.onosproject.incubator.protobuf.models.net.device.DeviceProtoTranslator;
44import org.onosproject.incubator.protobuf.models.net.device.PortProtoTranslator;
45import org.onosproject.incubator.protobuf.models.net.MastershipRoleProtoTranslator;
46
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080047import org.slf4j.Logger;
48import org.slf4j.LoggerFactory;
49
50import com.google.common.base.MoreObjects;
51
52import io.grpc.Channel;
53import io.grpc.stub.StreamObserver;
54
55// gRPC Client side
56// gRPC wise, this object represents bidirectional streaming service session
57// and deals with outgoing message stream
58/**
59 * DeviceProviderService instance associated with given DeviceProvider.
60 */
61final class DeviceProviderServiceClientProxy
62 extends AbstractProviderService<DeviceProvider>
63 implements DeviceProviderService {
64
65 private final Logger log = LoggerFactory.getLogger(getClass());
66
67 private final StreamObserver<DeviceProviderServiceMsg> devProvService;
68 private final AtomicBoolean hasShutdown = new AtomicBoolean(false);
69
70 private final Channel channel;
71
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -070072 private Throwable error;
73
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080074 DeviceProviderServiceClientProxy(DeviceProvider provider, Channel channel) {
75 super(provider);
76 this.channel = channel;
77
78 DeviceProviderRegistryRpcStub stub = DeviceProviderRegistryRpcGrpc.newStub(channel);
79 log.debug("Calling RPC register({}) against {}", provider.id(), channel.authority());
80 devProvService = stub.register(new DeviceProviderClientProxy(provider));
81
82 // send initialize message
83 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
84 builder.setRegisterProvider(builder.getRegisterProviderBuilder()
85 .setProviderScheme(provider.id().scheme())
86 .build());
87 devProvService.onNext(builder.build());
88 }
89
90 @Override
91 public void deviceConnected(DeviceId deviceId,
92 DeviceDescription deviceDescription) {
93 checkValidity();
94
95 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
96 builder.setDeviceConnected(builder.getDeviceConnectedBuilder()
97 .setDeviceId(deviceId.toString())
Jian Lid40252c2017-12-08 04:13:20 +090098 .setDeviceDescription(DeviceProtoTranslator.translate(deviceDescription))
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080099 .build());
100
101 devProvService.onNext(builder.build());
102 }
103
104 @Override
105 public void deviceDisconnected(DeviceId deviceId) {
106 checkValidity();
107
108 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
109 builder.setDeviceDisconnected(builder.getDeviceDisconnectedBuilder()
110 .setDeviceId(deviceId.toString())
111 .build());
112
113 devProvService.onNext(builder.build());
114 }
115
116 @Override
117 public void updatePorts(DeviceId deviceId,
118 List<PortDescription> portDescriptions) {
119 checkValidity();
120
121 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
Jian Lic9b4bf12017-06-26 23:50:32 +0900122 List<PortDescriptionProto> portDescs =
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800123 portDescriptions.stream()
Jian Lid40252c2017-12-08 04:13:20 +0900124 .map(PortProtoTranslator::translate)
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800125 .collect(toList());
126
127 builder.setUpdatePorts(builder.getUpdatePortsBuilder()
128 .setDeviceId(deviceId.toString())
129 .addAllPortDescriptions(portDescs)
130 .build());
131
132 devProvService.onNext(builder.build());
133 }
134
135 @Override
Michal Machce774332017-01-25 11:02:55 +0100136 public void deletePort(DeviceId deviceId, PortDescription portDescription) {
137
138 }
139
140 @Override
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800141 public void portStatusChanged(DeviceId deviceId,
142 PortDescription portDescription) {
143 checkValidity();
144
145 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
146 builder.setPortStatusChanged(builder.getPortStatusChangedBuilder()
147 .setDeviceId(deviceId.toString())
Jian Lid40252c2017-12-08 04:13:20 +0900148 .setPortDescription(PortProtoTranslator.translate(portDescription))
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800149 .build());
150
151 devProvService.onNext(builder.build());
152 }
153
154 @Override
155 public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
156 MastershipRole response) {
157 checkValidity();
158
159 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
160 builder.setReceivedRoleReply(builder.getReceivedRoleReplyBuilder()
161 .setDeviceId(deviceId.toString())
Jian Lid40252c2017-12-08 04:13:20 +0900162 .setRequested(MastershipRoleProtoTranslator.translate(requested))
163 .setResponse(MastershipRoleProtoTranslator.translate(response))
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800164 .build());
165
166 devProvService.onNext(builder.build());
167 }
168
169 @Override
170 public void updatePortStatistics(DeviceId deviceId,
171 Collection<PortStatistics> portStatistics) {
172 checkValidity();
173
174 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
Jian Lic9b4bf12017-06-26 23:50:32 +0900175 List<PortStatisticsProto> portStats =
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800176 portStatistics.stream()
Jian Lid40252c2017-12-08 04:13:20 +0900177 .map(PortProtoTranslator::translate)
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800178 .collect(toList());
179 builder.setUpdatePortStatistics(builder.getUpdatePortStatisticsBuilder()
180 .setDeviceId(deviceId.toString())
181 .addAllPortStatistics(portStats)
182 .build());
183
184 devProvService.onNext(builder.build());
185 }
186
187 /**
188 * Shutdown this session.
189 */
190 public void shutdown() {
191 if (hasShutdown.compareAndSet(false, true)) {
192 log.info("Shutting down session over {}", channel.authority());
193 // initiate clean shutdown from client
194 devProvService.onCompleted();
195 invalidate();
196 }
197 }
198
199 /**
200 * Abnormally terminate this session.
201 * @param t error details
202 */
203 public void shutdown(Throwable t) {
204 if (hasShutdown.compareAndSet(false, true)) {
205 log.error("Shutting down session over {}", channel.authority());
206 // initiate abnormal termination from client
207 devProvService.onError(t);
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700208 invalidate(t);
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800209 }
210 }
211
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700212 /**
213 * Invalidates the ProviderService indicating Failure.
214 * @param t {@link Throwable} describing last failure
215 */
216 private void invalidate(Throwable t) {
217 this.error = t;
218 invalidate();
219 }
220
221 @Override
222 public void checkValidity() {
223 if (error != null) {
224 throw new IllegalStateException("DeviceProviderService no longer valid",
225 error);
226 }
227 super.checkValidity();
228 }
229
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800230 @Override
231 public String toString() {
232 return MoreObjects.toStringHelper(this)
233 .add("channel", channel.authority())
234 .add("hasShutdown", hasShutdown.get())
235 .toString();
236 }
237
238 // gRPC wise, this object handles incoming message stream
239 /**
240 * Translates DeviceProvider instructions received from RPC to Java calls.
241 */
242 private final class DeviceProviderClientProxy
243 implements StreamObserver<DeviceProviderMsg> {
244
245 private final DeviceProvider provider;
246
247 DeviceProviderClientProxy(DeviceProvider provider) {
248 this.provider = checkNotNull(provider);
249 }
250
251 @Override
252 public void onNext(DeviceProviderMsg msg) {
253 try {
254 log.trace("DeviceProviderClientProxy received: {}", msg);
255 onMethod(msg);
256 } catch (Exception e) {
257 log.error("Exception caught handling {} at DeviceProviderClientProxy", msg, e);
258 // initiate shutdown from client
259 shutdown(e);
260 }
261 }
262
263 /**
264 * Translates received RPC message to {@link DeviceProvider} method calls.
265 * @param msg DeviceProvider message
266 */
267 private void onMethod(DeviceProviderMsg msg) {
268 switch (msg.getMethodCase()) {
269 case TRIGGER_PROBE:
270 TriggerProbe triggerProbe = msg.getTriggerProbe();
271 provider.triggerProbe(deviceId(triggerProbe.getDeviceId()));
272 break;
273 case ROLE_CHANGED:
274 RoleChanged roleChanged = msg.getRoleChanged();
275 provider.roleChanged(deviceId(roleChanged.getDeviceId()),
Jian Lid40252c2017-12-08 04:13:20 +0900276 (MastershipRole) MastershipRoleProtoTranslator.translate(roleChanged.getNewRole()).get());
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800277 break;
278 case IS_REACHABLE_REQUEST:
279 IsReachableRequest isReachableRequest = msg.getIsReachableRequest();
280 // check if reachable
281 boolean reachable = provider.isReachable(deviceId(isReachableRequest.getDeviceId()));
282
283 int xid = isReachableRequest.getXid();
284 // send response back DeviceProviderService channel
285 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
286 builder.setIsReachableResponse(builder.getIsReachableResponseBuilder()
287 .setXid(xid)
288 .setIsReachable(reachable)
289 .build());
290 devProvService.onNext(builder.build());
291 break;
292
293 case METHOD_NOT_SET:
294 default:
295 log.warn("Unexpected method, ignoring", msg);
296 break;
297 }
298 }
299
300 @Override
301 public void onCompleted() {
302 log.info("DeviceProviderClientProxy completed");
303 // session terminated from remote
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700304 invalidate();
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800305 }
306
307 @Override
308 public void onError(Throwable t) {
309 log.error("DeviceProviderClientProxy#onError", t);
310 // session terminated from remote
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700311 invalidate(t);
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800312 }
313
314 @Override
315 public String toString() {
316 return MoreObjects.toStringHelper(this)
317 .add("channel", channel.authority())
318 .toString();
319 }
320 }
321}
322