blob: d11a834490f9f534b9895db9f367bd2cbe2f53e5 [file] [log] [blame]
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.rpc.grpc;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static java.util.stream.Collectors.toList;
Aaron Kruglikov9f95f992017-06-23 14:15:25 +090020import static org.onosproject.incubator.protobuf.models.ProtobufUtils.translate;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080021import static org.onosproject.net.DeviceId.deviceId;
22
23import java.util.Collection;
24import java.util.List;
25import java.util.concurrent.atomic.AtomicBoolean;
26
HIGUCHI Yutae3e90632016-05-11 16:44:01 -070027import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc;
28import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpcStub;
29import org.onosproject.grpc.net.device.DeviceService.DeviceProviderMsg;
30import org.onosproject.grpc.net.device.DeviceService.DeviceProviderServiceMsg;
31import org.onosproject.grpc.net.device.DeviceService.IsReachableRequest;
32import org.onosproject.grpc.net.device.DeviceService.RoleChanged;
33import org.onosproject.grpc.net.device.DeviceService.TriggerProbe;
Jian Lic9b4bf12017-06-26 23:50:32 +090034import org.onosproject.grpc.net.device.models.PortDescriptionProtoOuterClass.PortDescriptionProto;
35import org.onosproject.grpc.net.device.models.PortStatisticsProtoOuterClass.PortStatisticsProto;
Aaron Kruglikov9f95f992017-06-23 14:15:25 +090036import org.onosproject.incubator.protobuf.models.ProtobufUtils;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080037import org.onosproject.net.DeviceId;
38import org.onosproject.net.MastershipRole;
39import org.onosproject.net.device.DeviceDescription;
40import org.onosproject.net.device.DeviceProvider;
41import org.onosproject.net.device.DeviceProviderService;
42import org.onosproject.net.device.PortDescription;
43import org.onosproject.net.device.PortStatistics;
44import org.onosproject.net.provider.AbstractProviderService;
45import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
47
48import com.google.common.base.MoreObjects;
49
50import io.grpc.Channel;
51import io.grpc.stub.StreamObserver;
52
53// gRPC Client side
54// gRPC wise, this object represents bidirectional streaming service session
55// and deals with outgoing message stream
56/**
57 * DeviceProviderService instance associated with given DeviceProvider.
58 */
59final class DeviceProviderServiceClientProxy
60 extends AbstractProviderService<DeviceProvider>
61 implements DeviceProviderService {
62
63 private final Logger log = LoggerFactory.getLogger(getClass());
64
65 private final StreamObserver<DeviceProviderServiceMsg> devProvService;
66 private final AtomicBoolean hasShutdown = new AtomicBoolean(false);
67
68 private final Channel channel;
69
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -070070 private Throwable error;
71
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080072 DeviceProviderServiceClientProxy(DeviceProvider provider, Channel channel) {
73 super(provider);
74 this.channel = channel;
75
76 DeviceProviderRegistryRpcStub stub = DeviceProviderRegistryRpcGrpc.newStub(channel);
77 log.debug("Calling RPC register({}) against {}", provider.id(), channel.authority());
78 devProvService = stub.register(new DeviceProviderClientProxy(provider));
79
80 // send initialize message
81 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
82 builder.setRegisterProvider(builder.getRegisterProviderBuilder()
83 .setProviderScheme(provider.id().scheme())
84 .build());
85 devProvService.onNext(builder.build());
86 }
87
88 @Override
89 public void deviceConnected(DeviceId deviceId,
90 DeviceDescription deviceDescription) {
91 checkValidity();
92
93 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
94 builder.setDeviceConnected(builder.getDeviceConnectedBuilder()
95 .setDeviceId(deviceId.toString())
96 .setDeviceDescription(translate(deviceDescription))
97 .build());
98
99 devProvService.onNext(builder.build());
100 }
101
102 @Override
103 public void deviceDisconnected(DeviceId deviceId) {
104 checkValidity();
105
106 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
107 builder.setDeviceDisconnected(builder.getDeviceDisconnectedBuilder()
108 .setDeviceId(deviceId.toString())
109 .build());
110
111 devProvService.onNext(builder.build());
112 }
113
114 @Override
115 public void updatePorts(DeviceId deviceId,
116 List<PortDescription> portDescriptions) {
117 checkValidity();
118
119 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
Jian Lic9b4bf12017-06-26 23:50:32 +0900120 List<PortDescriptionProto> portDescs =
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800121 portDescriptions.stream()
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -0700122 .map(ProtobufUtils::translate)
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800123 .collect(toList());
124
125 builder.setUpdatePorts(builder.getUpdatePortsBuilder()
126 .setDeviceId(deviceId.toString())
127 .addAllPortDescriptions(portDescs)
128 .build());
129
130 devProvService.onNext(builder.build());
131 }
132
133 @Override
Michal Machce774332017-01-25 11:02:55 +0100134 public void deletePort(DeviceId deviceId, PortDescription portDescription) {
135
136 }
137
138 @Override
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800139 public void portStatusChanged(DeviceId deviceId,
140 PortDescription portDescription) {
141 checkValidity();
142
143 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
144 builder.setPortStatusChanged(builder.getPortStatusChangedBuilder()
145 .setDeviceId(deviceId.toString())
146 .setPortDescription(translate(portDescription))
147 .build());
148
149 devProvService.onNext(builder.build());
150 }
151
152 @Override
153 public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
154 MastershipRole response) {
155 checkValidity();
156
157 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
158 builder.setReceivedRoleReply(builder.getReceivedRoleReplyBuilder()
159 .setDeviceId(deviceId.toString())
160 .setRequested(translate(requested))
161 .setResponse(translate(response))
162 .build());
163
164 devProvService.onNext(builder.build());
165 }
166
167 @Override
168 public void updatePortStatistics(DeviceId deviceId,
169 Collection<PortStatistics> portStatistics) {
170 checkValidity();
171
172 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
Jian Lic9b4bf12017-06-26 23:50:32 +0900173 List<PortStatisticsProto> portStats =
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800174 portStatistics.stream()
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -0700175 .map(ProtobufUtils::translate)
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800176 .collect(toList());
177 builder.setUpdatePortStatistics(builder.getUpdatePortStatisticsBuilder()
178 .setDeviceId(deviceId.toString())
179 .addAllPortStatistics(portStats)
180 .build());
181
182 devProvService.onNext(builder.build());
183 }
184
185 /**
186 * Shutdown this session.
187 */
188 public void shutdown() {
189 if (hasShutdown.compareAndSet(false, true)) {
190 log.info("Shutting down session over {}", channel.authority());
191 // initiate clean shutdown from client
192 devProvService.onCompleted();
193 invalidate();
194 }
195 }
196
197 /**
198 * Abnormally terminate this session.
199 * @param t error details
200 */
201 public void shutdown(Throwable t) {
202 if (hasShutdown.compareAndSet(false, true)) {
203 log.error("Shutting down session over {}", channel.authority());
204 // initiate abnormal termination from client
205 devProvService.onError(t);
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700206 invalidate(t);
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800207 }
208 }
209
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700210 /**
211 * Invalidates the ProviderService indicating Failure.
212 * @param t {@link Throwable} describing last failure
213 */
214 private void invalidate(Throwable t) {
215 this.error = t;
216 invalidate();
217 }
218
219 @Override
220 public void checkValidity() {
221 if (error != null) {
222 throw new IllegalStateException("DeviceProviderService no longer valid",
223 error);
224 }
225 super.checkValidity();
226 }
227
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800228 @Override
229 public String toString() {
230 return MoreObjects.toStringHelper(this)
231 .add("channel", channel.authority())
232 .add("hasShutdown", hasShutdown.get())
233 .toString();
234 }
235
236 // gRPC wise, this object handles incoming message stream
237 /**
238 * Translates DeviceProvider instructions received from RPC to Java calls.
239 */
240 private final class DeviceProviderClientProxy
241 implements StreamObserver<DeviceProviderMsg> {
242
243 private final DeviceProvider provider;
244
245 DeviceProviderClientProxy(DeviceProvider provider) {
246 this.provider = checkNotNull(provider);
247 }
248
249 @Override
250 public void onNext(DeviceProviderMsg msg) {
251 try {
252 log.trace("DeviceProviderClientProxy received: {}", msg);
253 onMethod(msg);
254 } catch (Exception e) {
255 log.error("Exception caught handling {} at DeviceProviderClientProxy", msg, e);
256 // initiate shutdown from client
257 shutdown(e);
258 }
259 }
260
261 /**
262 * Translates received RPC message to {@link DeviceProvider} method calls.
263 * @param msg DeviceProvider message
264 */
265 private void onMethod(DeviceProviderMsg msg) {
266 switch (msg.getMethodCase()) {
267 case TRIGGER_PROBE:
268 TriggerProbe triggerProbe = msg.getTriggerProbe();
269 provider.triggerProbe(deviceId(triggerProbe.getDeviceId()));
270 break;
271 case ROLE_CHANGED:
272 RoleChanged roleChanged = msg.getRoleChanged();
273 provider.roleChanged(deviceId(roleChanged.getDeviceId()),
274 translate(roleChanged.getNewRole()));
275 break;
276 case IS_REACHABLE_REQUEST:
277 IsReachableRequest isReachableRequest = msg.getIsReachableRequest();
278 // check if reachable
279 boolean reachable = provider.isReachable(deviceId(isReachableRequest.getDeviceId()));
280
281 int xid = isReachableRequest.getXid();
282 // send response back DeviceProviderService channel
283 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
284 builder.setIsReachableResponse(builder.getIsReachableResponseBuilder()
285 .setXid(xid)
286 .setIsReachable(reachable)
287 .build());
288 devProvService.onNext(builder.build());
289 break;
290
291 case METHOD_NOT_SET:
292 default:
293 log.warn("Unexpected method, ignoring", msg);
294 break;
295 }
296 }
297
298 @Override
299 public void onCompleted() {
300 log.info("DeviceProviderClientProxy completed");
301 // session terminated from remote
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700302 invalidate();
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800303 }
304
305 @Override
306 public void onError(Throwable t) {
307 log.error("DeviceProviderClientProxy#onError", t);
308 // session terminated from remote
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700309 invalidate(t);
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800310 }
311
312 @Override
313 public String toString() {
314 return MoreObjects.toStringHelper(this)
315 .add("channel", channel.authority())
316 .toString();
317 }
318 }
319}
320