blob: bd133ad150a29e6f461b17b0b45afe10d2ed8f9c [file] [log] [blame]
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.rpc.grpc;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static java.util.stream.Collectors.toList;
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -070020import static org.onosproject.incubator.protobuf.net.ProtobufUtils.translate;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080021import static org.onosproject.net.DeviceId.deviceId;
22
23import java.util.Collection;
24import java.util.List;
25import java.util.concurrent.atomic.AtomicBoolean;
26
HIGUCHI Yutae3e90632016-05-11 16:44:01 -070027import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc;
28import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpcStub;
29import org.onosproject.grpc.net.device.DeviceService.DeviceProviderMsg;
30import org.onosproject.grpc.net.device.DeviceService.DeviceProviderServiceMsg;
31import org.onosproject.grpc.net.device.DeviceService.IsReachableRequest;
32import org.onosproject.grpc.net.device.DeviceService.RoleChanged;
33import org.onosproject.grpc.net.device.DeviceService.TriggerProbe;
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -070034import org.onosproject.incubator.protobuf.net.ProtobufUtils;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080035import org.onosproject.net.DeviceId;
36import org.onosproject.net.MastershipRole;
37import org.onosproject.net.device.DeviceDescription;
38import org.onosproject.net.device.DeviceProvider;
39import org.onosproject.net.device.DeviceProviderService;
40import org.onosproject.net.device.PortDescription;
41import org.onosproject.net.device.PortStatistics;
42import org.onosproject.net.provider.AbstractProviderService;
43import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
46import com.google.common.base.MoreObjects;
47
48import io.grpc.Channel;
49import io.grpc.stub.StreamObserver;
50
51// gRPC Client side
52// gRPC wise, this object represents bidirectional streaming service session
53// and deals with outgoing message stream
54/**
55 * DeviceProviderService instance associated with given DeviceProvider.
56 */
57final class DeviceProviderServiceClientProxy
58 extends AbstractProviderService<DeviceProvider>
59 implements DeviceProviderService {
60
61 private final Logger log = LoggerFactory.getLogger(getClass());
62
63 private final StreamObserver<DeviceProviderServiceMsg> devProvService;
64 private final AtomicBoolean hasShutdown = new AtomicBoolean(false);
65
66 private final Channel channel;
67
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -070068 private Throwable error;
69
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080070 DeviceProviderServiceClientProxy(DeviceProvider provider, Channel channel) {
71 super(provider);
72 this.channel = channel;
73
74 DeviceProviderRegistryRpcStub stub = DeviceProviderRegistryRpcGrpc.newStub(channel);
75 log.debug("Calling RPC register({}) against {}", provider.id(), channel.authority());
76 devProvService = stub.register(new DeviceProviderClientProxy(provider));
77
78 // send initialize message
79 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
80 builder.setRegisterProvider(builder.getRegisterProviderBuilder()
81 .setProviderScheme(provider.id().scheme())
82 .build());
83 devProvService.onNext(builder.build());
84 }
85
86 @Override
87 public void deviceConnected(DeviceId deviceId,
88 DeviceDescription deviceDescription) {
89 checkValidity();
90
91 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
92 builder.setDeviceConnected(builder.getDeviceConnectedBuilder()
93 .setDeviceId(deviceId.toString())
94 .setDeviceDescription(translate(deviceDescription))
95 .build());
96
97 devProvService.onNext(builder.build());
98 }
99
100 @Override
101 public void deviceDisconnected(DeviceId deviceId) {
102 checkValidity();
103
104 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
105 builder.setDeviceDisconnected(builder.getDeviceDisconnectedBuilder()
106 .setDeviceId(deviceId.toString())
107 .build());
108
109 devProvService.onNext(builder.build());
110 }
111
112 @Override
113 public void updatePorts(DeviceId deviceId,
114 List<PortDescription> portDescriptions) {
115 checkValidity();
116
117 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
HIGUCHI Yutae3e90632016-05-11 16:44:01 -0700118 List<org.onosproject.grpc.net.Port.PortDescription> portDescs =
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800119 portDescriptions.stream()
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -0700120 .map(ProtobufUtils::translate)
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800121 .collect(toList());
122
123 builder.setUpdatePorts(builder.getUpdatePortsBuilder()
124 .setDeviceId(deviceId.toString())
125 .addAllPortDescriptions(portDescs)
126 .build());
127
128 devProvService.onNext(builder.build());
129 }
130
131 @Override
Michal Machce774332017-01-25 11:02:55 +0100132 public void deletePort(DeviceId deviceId, PortDescription portDescription) {
133
134 }
135
136 @Override
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800137 public void portStatusChanged(DeviceId deviceId,
138 PortDescription portDescription) {
139 checkValidity();
140
141 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
142 builder.setPortStatusChanged(builder.getPortStatusChangedBuilder()
143 .setDeviceId(deviceId.toString())
144 .setPortDescription(translate(portDescription))
145 .build());
146
147 devProvService.onNext(builder.build());
148 }
149
150 @Override
151 public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
152 MastershipRole response) {
153 checkValidity();
154
155 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
156 builder.setReceivedRoleReply(builder.getReceivedRoleReplyBuilder()
157 .setDeviceId(deviceId.toString())
158 .setRequested(translate(requested))
159 .setResponse(translate(response))
160 .build());
161
162 devProvService.onNext(builder.build());
163 }
164
165 @Override
166 public void updatePortStatistics(DeviceId deviceId,
167 Collection<PortStatistics> portStatistics) {
168 checkValidity();
169
170 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
HIGUCHI Yutae3e90632016-05-11 16:44:01 -0700171 List<org.onosproject.grpc.net.Port.PortStatistics> portStats =
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800172 portStatistics.stream()
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -0700173 .map(ProtobufUtils::translate)
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800174 .collect(toList());
175 builder.setUpdatePortStatistics(builder.getUpdatePortStatisticsBuilder()
176 .setDeviceId(deviceId.toString())
177 .addAllPortStatistics(portStats)
178 .build());
179
180 devProvService.onNext(builder.build());
181 }
182
183 /**
184 * Shutdown this session.
185 */
186 public void shutdown() {
187 if (hasShutdown.compareAndSet(false, true)) {
188 log.info("Shutting down session over {}", channel.authority());
189 // initiate clean shutdown from client
190 devProvService.onCompleted();
191 invalidate();
192 }
193 }
194
195 /**
196 * Abnormally terminate this session.
197 * @param t error details
198 */
199 public void shutdown(Throwable t) {
200 if (hasShutdown.compareAndSet(false, true)) {
201 log.error("Shutting down session over {}", channel.authority());
202 // initiate abnormal termination from client
203 devProvService.onError(t);
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700204 invalidate(t);
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800205 }
206 }
207
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700208 /**
209 * Invalidates the ProviderService indicating Failure.
210 * @param t {@link Throwable} describing last failure
211 */
212 private void invalidate(Throwable t) {
213 this.error = t;
214 invalidate();
215 }
216
217 @Override
218 public void checkValidity() {
219 if (error != null) {
220 throw new IllegalStateException("DeviceProviderService no longer valid",
221 error);
222 }
223 super.checkValidity();
224 }
225
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800226 @Override
227 public String toString() {
228 return MoreObjects.toStringHelper(this)
229 .add("channel", channel.authority())
230 .add("hasShutdown", hasShutdown.get())
231 .toString();
232 }
233
234 // gRPC wise, this object handles incoming message stream
235 /**
236 * Translates DeviceProvider instructions received from RPC to Java calls.
237 */
238 private final class DeviceProviderClientProxy
239 implements StreamObserver<DeviceProviderMsg> {
240
241 private final DeviceProvider provider;
242
243 DeviceProviderClientProxy(DeviceProvider provider) {
244 this.provider = checkNotNull(provider);
245 }
246
247 @Override
248 public void onNext(DeviceProviderMsg msg) {
249 try {
250 log.trace("DeviceProviderClientProxy received: {}", msg);
251 onMethod(msg);
252 } catch (Exception e) {
253 log.error("Exception caught handling {} at DeviceProviderClientProxy", msg, e);
254 // initiate shutdown from client
255 shutdown(e);
256 }
257 }
258
259 /**
260 * Translates received RPC message to {@link DeviceProvider} method calls.
261 * @param msg DeviceProvider message
262 */
263 private void onMethod(DeviceProviderMsg msg) {
264 switch (msg.getMethodCase()) {
265 case TRIGGER_PROBE:
266 TriggerProbe triggerProbe = msg.getTriggerProbe();
267 provider.triggerProbe(deviceId(triggerProbe.getDeviceId()));
268 break;
269 case ROLE_CHANGED:
270 RoleChanged roleChanged = msg.getRoleChanged();
271 provider.roleChanged(deviceId(roleChanged.getDeviceId()),
272 translate(roleChanged.getNewRole()));
273 break;
274 case IS_REACHABLE_REQUEST:
275 IsReachableRequest isReachableRequest = msg.getIsReachableRequest();
276 // check if reachable
277 boolean reachable = provider.isReachable(deviceId(isReachableRequest.getDeviceId()));
278
279 int xid = isReachableRequest.getXid();
280 // send response back DeviceProviderService channel
281 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
282 builder.setIsReachableResponse(builder.getIsReachableResponseBuilder()
283 .setXid(xid)
284 .setIsReachable(reachable)
285 .build());
286 devProvService.onNext(builder.build());
287 break;
288
289 case METHOD_NOT_SET:
290 default:
291 log.warn("Unexpected method, ignoring", msg);
292 break;
293 }
294 }
295
296 @Override
297 public void onCompleted() {
298 log.info("DeviceProviderClientProxy completed");
299 // session terminated from remote
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700300 invalidate();
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800301 }
302
303 @Override
304 public void onError(Throwable t) {
305 log.error("DeviceProviderClientProxy#onError", t);
306 // session terminated from remote
Yuta HIGUCHIad4861e2016-07-13 19:19:03 -0700307 invalidate(t);
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800308 }
309
310 @Override
311 public String toString() {
312 return MoreObjects.toStringHelper(this)
313 .add("channel", channel.authority())
314 .toString();
315 }
316 }
317}
318