blob: a6712b9539d678eeab11ea89d6e05672b1020252 [file] [log] [blame]
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.rpc.grpc;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static java.util.stream.Collectors.toList;
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -070020import static org.onosproject.incubator.protobuf.net.ProtobufUtils.translate;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080021import static org.onosproject.net.DeviceId.deviceId;
22
23import java.util.Collection;
24import java.util.List;
25import java.util.concurrent.atomic.AtomicBoolean;
26
HIGUCHI Yutae3e90632016-05-11 16:44:01 -070027import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc;
28import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpcStub;
29import org.onosproject.grpc.net.device.DeviceService.DeviceProviderMsg;
30import org.onosproject.grpc.net.device.DeviceService.DeviceProviderServiceMsg;
31import org.onosproject.grpc.net.device.DeviceService.IsReachableRequest;
32import org.onosproject.grpc.net.device.DeviceService.RoleChanged;
33import org.onosproject.grpc.net.device.DeviceService.TriggerProbe;
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -070034import org.onosproject.incubator.protobuf.net.ProtobufUtils;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080035import org.onosproject.net.DeviceId;
36import org.onosproject.net.MastershipRole;
37import org.onosproject.net.device.DeviceDescription;
38import org.onosproject.net.device.DeviceProvider;
39import org.onosproject.net.device.DeviceProviderService;
40import org.onosproject.net.device.PortDescription;
41import org.onosproject.net.device.PortStatistics;
42import org.onosproject.net.provider.AbstractProviderService;
43import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
46import com.google.common.base.MoreObjects;
47
48import io.grpc.Channel;
49import io.grpc.stub.StreamObserver;
50
51// gRPC Client side
52// gRPC wise, this object represents bidirectional streaming service session
53// and deals with outgoing message stream
54/**
55 * DeviceProviderService instance associated with given DeviceProvider.
56 */
57final class DeviceProviderServiceClientProxy
58 extends AbstractProviderService<DeviceProvider>
59 implements DeviceProviderService {
60
61 private final Logger log = LoggerFactory.getLogger(getClass());
62
63 private final StreamObserver<DeviceProviderServiceMsg> devProvService;
64 private final AtomicBoolean hasShutdown = new AtomicBoolean(false);
65
66 private final Channel channel;
67
68 DeviceProviderServiceClientProxy(DeviceProvider provider, Channel channel) {
69 super(provider);
70 this.channel = channel;
71
72 DeviceProviderRegistryRpcStub stub = DeviceProviderRegistryRpcGrpc.newStub(channel);
73 log.debug("Calling RPC register({}) against {}", provider.id(), channel.authority());
74 devProvService = stub.register(new DeviceProviderClientProxy(provider));
75
76 // send initialize message
77 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
78 builder.setRegisterProvider(builder.getRegisterProviderBuilder()
79 .setProviderScheme(provider.id().scheme())
80 .build());
81 devProvService.onNext(builder.build());
82 }
83
84 @Override
85 public void deviceConnected(DeviceId deviceId,
86 DeviceDescription deviceDescription) {
87 checkValidity();
88
89 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
90 builder.setDeviceConnected(builder.getDeviceConnectedBuilder()
91 .setDeviceId(deviceId.toString())
92 .setDeviceDescription(translate(deviceDescription))
93 .build());
94
95 devProvService.onNext(builder.build());
96 }
97
98 @Override
99 public void deviceDisconnected(DeviceId deviceId) {
100 checkValidity();
101
102 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
103 builder.setDeviceDisconnected(builder.getDeviceDisconnectedBuilder()
104 .setDeviceId(deviceId.toString())
105 .build());
106
107 devProvService.onNext(builder.build());
108 }
109
110 @Override
111 public void updatePorts(DeviceId deviceId,
112 List<PortDescription> portDescriptions) {
113 checkValidity();
114
115 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
HIGUCHI Yutae3e90632016-05-11 16:44:01 -0700116 List<org.onosproject.grpc.net.Port.PortDescription> portDescs =
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800117 portDescriptions.stream()
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -0700118 .map(ProtobufUtils::translate)
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800119 .collect(toList());
120
121 builder.setUpdatePorts(builder.getUpdatePortsBuilder()
122 .setDeviceId(deviceId.toString())
123 .addAllPortDescriptions(portDescs)
124 .build());
125
126 devProvService.onNext(builder.build());
127 }
128
129 @Override
130 public void portStatusChanged(DeviceId deviceId,
131 PortDescription portDescription) {
132 checkValidity();
133
134 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
135 builder.setPortStatusChanged(builder.getPortStatusChangedBuilder()
136 .setDeviceId(deviceId.toString())
137 .setPortDescription(translate(portDescription))
138 .build());
139
140 devProvService.onNext(builder.build());
141 }
142
143 @Override
144 public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
145 MastershipRole response) {
146 checkValidity();
147
148 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
149 builder.setReceivedRoleReply(builder.getReceivedRoleReplyBuilder()
150 .setDeviceId(deviceId.toString())
151 .setRequested(translate(requested))
152 .setResponse(translate(response))
153 .build());
154
155 devProvService.onNext(builder.build());
156 }
157
158 @Override
159 public void updatePortStatistics(DeviceId deviceId,
160 Collection<PortStatistics> portStatistics) {
161 checkValidity();
162
163 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
HIGUCHI Yutae3e90632016-05-11 16:44:01 -0700164 List<org.onosproject.grpc.net.Port.PortStatistics> portStats =
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800165 portStatistics.stream()
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -0700166 .map(ProtobufUtils::translate)
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800167 .collect(toList());
168 builder.setUpdatePortStatistics(builder.getUpdatePortStatisticsBuilder()
169 .setDeviceId(deviceId.toString())
170 .addAllPortStatistics(portStats)
171 .build());
172
173 devProvService.onNext(builder.build());
174 }
175
176 /**
177 * Shutdown this session.
178 */
179 public void shutdown() {
180 if (hasShutdown.compareAndSet(false, true)) {
181 log.info("Shutting down session over {}", channel.authority());
182 // initiate clean shutdown from client
183 devProvService.onCompleted();
184 invalidate();
185 }
186 }
187
188 /**
189 * Abnormally terminate this session.
190 * @param t error details
191 */
192 public void shutdown(Throwable t) {
193 if (hasShutdown.compareAndSet(false, true)) {
194 log.error("Shutting down session over {}", channel.authority());
195 // initiate abnormal termination from client
196 devProvService.onError(t);
197 invalidate();
198 }
199 }
200
201 @Override
202 public String toString() {
203 return MoreObjects.toStringHelper(this)
204 .add("channel", channel.authority())
205 .add("hasShutdown", hasShutdown.get())
206 .toString();
207 }
208
209 // gRPC wise, this object handles incoming message stream
210 /**
211 * Translates DeviceProvider instructions received from RPC to Java calls.
212 */
213 private final class DeviceProviderClientProxy
214 implements StreamObserver<DeviceProviderMsg> {
215
216 private final DeviceProvider provider;
217
218 DeviceProviderClientProxy(DeviceProvider provider) {
219 this.provider = checkNotNull(provider);
220 }
221
222 @Override
223 public void onNext(DeviceProviderMsg msg) {
224 try {
225 log.trace("DeviceProviderClientProxy received: {}", msg);
226 onMethod(msg);
227 } catch (Exception e) {
228 log.error("Exception caught handling {} at DeviceProviderClientProxy", msg, e);
229 // initiate shutdown from client
230 shutdown(e);
231 }
232 }
233
234 /**
235 * Translates received RPC message to {@link DeviceProvider} method calls.
236 * @param msg DeviceProvider message
237 */
238 private void onMethod(DeviceProviderMsg msg) {
239 switch (msg.getMethodCase()) {
240 case TRIGGER_PROBE:
241 TriggerProbe triggerProbe = msg.getTriggerProbe();
242 provider.triggerProbe(deviceId(triggerProbe.getDeviceId()));
243 break;
244 case ROLE_CHANGED:
245 RoleChanged roleChanged = msg.getRoleChanged();
246 provider.roleChanged(deviceId(roleChanged.getDeviceId()),
247 translate(roleChanged.getNewRole()));
248 break;
249 case IS_REACHABLE_REQUEST:
250 IsReachableRequest isReachableRequest = msg.getIsReachableRequest();
251 // check if reachable
252 boolean reachable = provider.isReachable(deviceId(isReachableRequest.getDeviceId()));
253
254 int xid = isReachableRequest.getXid();
255 // send response back DeviceProviderService channel
256 DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
257 builder.setIsReachableResponse(builder.getIsReachableResponseBuilder()
258 .setXid(xid)
259 .setIsReachable(reachable)
260 .build());
261 devProvService.onNext(builder.build());
262 break;
263
264 case METHOD_NOT_SET:
265 default:
266 log.warn("Unexpected method, ignoring", msg);
267 break;
268 }
269 }
270
271 @Override
272 public void onCompleted() {
273 log.info("DeviceProviderClientProxy completed");
274 // session terminated from remote
275 // TODO unregister...? how?
276
277 //devProvService.onCompleted();
278 }
279
280 @Override
281 public void onError(Throwable t) {
282 log.error("DeviceProviderClientProxy#onError", t);
283 // session terminated from remote
284 // TODO unregister...? how?
285 //devProvService.onError(t);
286 }
287
288 @Override
289 public String toString() {
290 return MoreObjects.toStringHelper(this)
291 .add("channel", channel.authority())
292 .toString();
293 }
294 }
295}
296