blob: 498011f20ea8264f703029a895fca39c3ae8e403 [file] [log] [blame]
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.rpc.grpc;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static java.util.stream.Collectors.toList;
20import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
21import static org.onosproject.net.DeviceId.deviceId;
22
23import java.util.Collection;
24import java.util.List;
25import java.util.concurrent.atomic.AtomicBoolean;
26
27import org.onosproject.grpc.Device.DeviceProviderMsg;
28import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
29import org.onosproject.grpc.Device.IsReachableRequest;
30import org.onosproject.grpc.Device.RoleChanged;
31import org.onosproject.grpc.Device.TriggerProbe;
32import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
33import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpcStub;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.MastershipRole;
36import org.onosproject.net.device.DeviceDescription;
37import org.onosproject.net.device.DeviceProvider;
38import org.onosproject.net.device.DeviceProviderService;
39import org.onosproject.net.device.PortDescription;
40import org.onosproject.net.device.PortStatistics;
41import org.onosproject.net.provider.AbstractProviderService;
42import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
45import com.google.common.base.MoreObjects;
46
47import io.grpc.Channel;
48import io.grpc.stub.StreamObserver;
49
50// gRPC Client side
51// gRPC wise, this object represents bidirectional streaming service session
52// and deals with outgoing message stream
53/**
54 * DeviceProviderService instance associated with given DeviceProvider.
55 */
56final class DeviceProviderServiceClientProxy
57 extends AbstractProviderService<DeviceProvider>
58 implements DeviceProviderService {
59
60 private final Logger log = LoggerFactory.getLogger(getClass());
61
62 private final StreamObserver<DeviceProviderServiceMsg> devProvService;
63 private final AtomicBoolean hasShutdown = new AtomicBoolean(false);
64
65 private final Channel channel;
66
67 DeviceProviderServiceClientProxy(DeviceProvider provider, Channel channel) {
68 super(provider);
69 this.channel = channel;
70
71 DeviceProviderRegistryRpcStub stub = DeviceProviderRegistryRpcGrpc.newStub(channel);
72 log.debug("Calling RPC register({}) against {}", provider.id(), channel.authority());
73 devProvService = stub.register(new DeviceProviderClientProxy(provider));
74
75 // send initialize message
76 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
77 builder.setRegisterProvider(builder.getRegisterProviderBuilder()
78 .setProviderScheme(provider.id().scheme())
79 .build());
80 devProvService.onNext(builder.build());
81 }
82
83 @Override
84 public void deviceConnected(DeviceId deviceId,
85 DeviceDescription deviceDescription) {
86 checkValidity();
87
88 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
89 builder.setDeviceConnected(builder.getDeviceConnectedBuilder()
90 .setDeviceId(deviceId.toString())
91 .setDeviceDescription(translate(deviceDescription))
92 .build());
93
94 devProvService.onNext(builder.build());
95 }
96
97 @Override
98 public void deviceDisconnected(DeviceId deviceId) {
99 checkValidity();
100
101 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
102 builder.setDeviceDisconnected(builder.getDeviceDisconnectedBuilder()
103 .setDeviceId(deviceId.toString())
104 .build());
105
106 devProvService.onNext(builder.build());
107 }
108
109 @Override
110 public void updatePorts(DeviceId deviceId,
111 List<PortDescription> portDescriptions) {
112 checkValidity();
113
114 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
115 List<org.onosproject.grpc.Port.PortDescription> portDescs =
116 portDescriptions.stream()
117 .map(GrpcDeviceUtils::translate)
118 .collect(toList());
119
120 builder.setUpdatePorts(builder.getUpdatePortsBuilder()
121 .setDeviceId(deviceId.toString())
122 .addAllPortDescriptions(portDescs)
123 .build());
124
125 devProvService.onNext(builder.build());
126 }
127
128 @Override
129 public void portStatusChanged(DeviceId deviceId,
130 PortDescription portDescription) {
131 checkValidity();
132
133 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
134 builder.setPortStatusChanged(builder.getPortStatusChangedBuilder()
135 .setDeviceId(deviceId.toString())
136 .setPortDescription(translate(portDescription))
137 .build());
138
139 devProvService.onNext(builder.build());
140 }
141
142 @Override
143 public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
144 MastershipRole response) {
145 checkValidity();
146
147 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
148 builder.setReceivedRoleReply(builder.getReceivedRoleReplyBuilder()
149 .setDeviceId(deviceId.toString())
150 .setRequested(translate(requested))
151 .setResponse(translate(response))
152 .build());
153
154 devProvService.onNext(builder.build());
155 }
156
157 @Override
158 public void updatePortStatistics(DeviceId deviceId,
159 Collection<PortStatistics> portStatistics) {
160 checkValidity();
161
162 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
163 List<org.onosproject.grpc.Port.PortStatistics> portStats =
164 portStatistics.stream()
165 .map(GrpcDeviceUtils::translate)
166 .collect(toList());
167 builder.setUpdatePortStatistics(builder.getUpdatePortStatisticsBuilder()
168 .setDeviceId(deviceId.toString())
169 .addAllPortStatistics(portStats)
170 .build());
171
172 devProvService.onNext(builder.build());
173 }
174
175 /**
176 * Shutdown this session.
177 */
178 public void shutdown() {
179 if (hasShutdown.compareAndSet(false, true)) {
180 log.info("Shutting down session over {}", channel.authority());
181 // initiate clean shutdown from client
182 devProvService.onCompleted();
183 invalidate();
184 }
185 }
186
187 /**
188 * Abnormally terminate this session.
189 * @param t error details
190 */
191 public void shutdown(Throwable t) {
192 if (hasShutdown.compareAndSet(false, true)) {
193 log.error("Shutting down session over {}", channel.authority());
194 // initiate abnormal termination from client
195 devProvService.onError(t);
196 invalidate();
197 }
198 }
199
200 @Override
201 public String toString() {
202 return MoreObjects.toStringHelper(this)
203 .add("channel", channel.authority())
204 .add("hasShutdown", hasShutdown.get())
205 .toString();
206 }
207
208 // gRPC wise, this object handles incoming message stream
209 /**
210 * Translates DeviceProvider instructions received from RPC to Java calls.
211 */
212 private final class DeviceProviderClientProxy
213 implements StreamObserver<DeviceProviderMsg> {
214
215 private final DeviceProvider provider;
216
217 DeviceProviderClientProxy(DeviceProvider provider) {
218 this.provider = checkNotNull(provider);
219 }
220
221 @Override
222 public void onNext(DeviceProviderMsg msg) {
223 try {
224 log.trace("DeviceProviderClientProxy received: {}", msg);
225 onMethod(msg);
226 } catch (Exception e) {
227 log.error("Exception caught handling {} at DeviceProviderClientProxy", msg, e);
228 // initiate shutdown from client
229 shutdown(e);
230 }
231 }
232
233 /**
234 * Translates received RPC message to {@link DeviceProvider} method calls.
235 * @param msg DeviceProvider message
236 */
237 private void onMethod(DeviceProviderMsg msg) {
238 switch (msg.getMethodCase()) {
239 case TRIGGER_PROBE:
240 TriggerProbe triggerProbe = msg.getTriggerProbe();
241 provider.triggerProbe(deviceId(triggerProbe.getDeviceId()));
242 break;
243 case ROLE_CHANGED:
244 RoleChanged roleChanged = msg.getRoleChanged();
245 provider.roleChanged(deviceId(roleChanged.getDeviceId()),
246 translate(roleChanged.getNewRole()));
247 break;
248 case IS_REACHABLE_REQUEST:
249 IsReachableRequest isReachableRequest = msg.getIsReachableRequest();
250 // check if reachable
251 boolean reachable = provider.isReachable(deviceId(isReachableRequest.getDeviceId()));
252
253 int xid = isReachableRequest.getXid();
254 // send response back DeviceProviderService channel
255 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
256 builder.setIsReachableResponse(builder.getIsReachableResponseBuilder()
257 .setXid(xid)
258 .setIsReachable(reachable)
259 .build());
260 devProvService.onNext(builder.build());
261 break;
262
263 case METHOD_NOT_SET:
264 default:
265 log.warn("Unexpected method, ignoring", msg);
266 break;
267 }
268 }
269
270 @Override
271 public void onCompleted() {
272 log.info("DeviceProviderClientProxy completed");
273 // session terminated from remote
274 // TODO unregister...? how?
275
276 //devProvService.onCompleted();
277 }
278
279 @Override
280 public void onError(Throwable t) {
281 log.error("DeviceProviderClientProxy#onError", t);
282 // session terminated from remote
283 // TODO unregister...? how?
284 //devProvService.onError(t);
285 }
286
287 @Override
288 public String toString() {
289 return MoreObjects.toStringHelper(this)
290 .add("channel", channel.authority())
291 .toString();
292 }
293 }
294}
295