blob: 62bdfb2af460be4837ec195d643da6af633023cc [file] [log] [blame]
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.rpc.grpc;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static java.util.stream.Collectors.toList;
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -070020import static org.onosproject.incubator.protobuf.net.ProtobufUtils.translate;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080021import static org.onosproject.net.DeviceId.deviceId;
22
23import java.io.IOException;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080024import java.util.Map;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080025import java.util.Set;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.ExecutionException;
28import java.util.concurrent.TimeUnit;
29import java.util.concurrent.TimeoutException;
30import java.util.concurrent.atomic.AtomicInteger;
31
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Modified;
36import org.apache.felix.scr.annotations.Property;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
HIGUCHI Yutae3e90632016-05-11 16:44:01 -070039import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc;
40import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
41import org.onosproject.grpc.net.device.DeviceService.DeviceConnected;
42import org.onosproject.grpc.net.device.DeviceService.DeviceDisconnected;
43import org.onosproject.grpc.net.device.DeviceService.DeviceProviderMsg;
44import org.onosproject.grpc.net.device.DeviceService.DeviceProviderServiceMsg;
45import org.onosproject.grpc.net.device.DeviceService.IsReachableResponse;
46import org.onosproject.grpc.net.device.DeviceService.PortStatusChanged;
47import org.onosproject.grpc.net.device.DeviceService.ReceivedRoleReply;
48import org.onosproject.grpc.net.device.DeviceService.RegisterProvider;
49import org.onosproject.grpc.net.device.DeviceService.UpdatePortStatistics;
50import org.onosproject.grpc.net.device.DeviceService.UpdatePorts;
51import org.onosproject.grpc.net.link.LinkProviderServiceRpcGrpc;
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -070052import org.onosproject.incubator.protobuf.net.ProtobufUtils;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080053import org.onosproject.net.DeviceId;
54import org.onosproject.net.MastershipRole;
Saurav Dasa2d37502016-03-25 17:50:40 -070055import org.onosproject.net.PortNumber;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080056import org.onosproject.net.device.DeviceProvider;
57import org.onosproject.net.device.DeviceProviderRegistry;
58import org.onosproject.net.device.DeviceProviderService;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080059import org.onosproject.net.link.LinkProvider;
60import org.onosproject.net.link.LinkProviderRegistry;
61import org.onosproject.net.link.LinkProviderService;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080062import org.onosproject.net.provider.ProviderId;
63import org.osgi.service.component.ComponentContext;
64import org.slf4j.Logger;
65import org.slf4j.LoggerFactory;
66
67import com.google.common.cache.Cache;
68import com.google.common.cache.CacheBuilder;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080069import com.google.common.collect.Maps;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080070import com.google.common.collect.Sets;
71
72import io.grpc.Server;
73import io.grpc.netty.NettyServerBuilder;
74import io.grpc.stub.StreamObserver;
75
76// gRPC Server on Metro-side
77// Translates request received on RPC channel, and calls corresponding Service on
78// Metro-ONOS cluster.
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080079
80// Currently supports DeviceProviderRegistry, LinkProviderService
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080081/**
82 * Server side implementation of gRPC based RemoteService.
83 */
84@Component(immediate = true)
85public class GrpcRemoteServiceServer {
86
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080087 static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080088
89 // TODO pick a number
90 public static final int DEFAULT_LISTEN_PORT = 11984;
91
92 private final Logger log = LoggerFactory.getLogger(getClass());
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected DeviceProviderRegistry deviceProviderRegistry;
96
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080097 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected LinkProviderRegistry linkProviderRegistry;
99
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800100
101 @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
102 label = "Port to listen on")
103 protected int listenPort = DEFAULT_LISTEN_PORT;
104
105 private Server server;
106 private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
107
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800108 // scheme -> ...
109 // updates must be guarded by synchronizing `this`
110 private final Map<String, LinkProviderService> linkProviderServices = Maps.newConcurrentMap();
111 private final Map<String, LinkProvider> linkProviders = Maps.newConcurrentMap();
112
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800113 @Activate
114 protected void activate(ComponentContext context) throws IOException {
115 modified(context);
116
117 log.debug("Server starting on {}", listenPort);
118 try {
119 server = NettyServerBuilder.forPort(listenPort)
120 .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800121 .addService(LinkProviderServiceRpcGrpc.bindService(new LinkProviderServiceServerProxy(this)))
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800122 .build().start();
123 } catch (IOException e) {
124 log.error("Failed to start gRPC server", e);
125 throw e;
126 }
127
128 log.info("Started on {}", listenPort);
129 }
130
131 @Deactivate
132 protected void deactivate() {
133
134 registeredProviders.stream()
135 .forEach(deviceProviderRegistry::unregister);
136
137 server.shutdown();
138 // Should we wait for shutdown?
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800139
140 unregisterLinkProviders();
141
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800142 log.info("Stopped");
143 }
144
145 @Modified
146 public void modified(ComponentContext context) {
147 // TODO support dynamic reconfiguration and restarting server?
148 }
149
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800150 /**
151 * Registers {@link StubLinkProvider} for given ProviderId scheme.
152 *
153 * DO NOT DIRECTLY CALL THIS METHOD.
154 * Only expected to be called from {@link #getLinkProviderServiceFor(String)}.
155 *
156 * @param scheme ProviderId scheme.
157 * @return {@link LinkProviderService} registered.
158 */
159 private synchronized LinkProviderService registerStubLinkProvider(String scheme) {
160 StubLinkProvider provider = new StubLinkProvider(scheme);
161 linkProviders.put(scheme, provider);
162 return linkProviderRegistry.register(provider);
163 }
164
165 /**
166 * Unregisters all registered LinkProviders.
167 */
168 private synchronized void unregisterLinkProviders() {
HIGUCHI Yuta6381a242016-03-13 23:29:10 -0700169 // TODO remove all links registered by these providers
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800170 linkProviders.values().forEach(linkProviderRegistry::unregister);
171 linkProviders.clear();
172 linkProviderServices.clear();
173 }
174
175 /**
176 * Gets or creates {@link LinkProviderService} registered for given ProviderId scheme.
177 *
178 * @param scheme ProviderId scheme.
179 * @return {@link LinkProviderService}
180 */
181 protected LinkProviderService getLinkProviderServiceFor(String scheme) {
182 return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider);
183 }
184
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800185 // RPC Server-side code
186 // RPC session Factory
187 /**
188 * Relays DeviceProviderRegistry calls from RPC client.
189 */
190 class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {
191
192 @Override
193 public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
194 log.trace("DeviceProviderRegistryServerProxy#register called!");
195
196 DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);
197
198 return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
199 }
200 }
201
202 // Lower -> Upper Controller message
203 // RPC Server-side code
204 // RPC session handler
205 private final class DeviceProviderServiceServerProxy
206 implements StreamObserver<DeviceProviderServiceMsg> {
207
208 // intentionally shadowing
209 private final Logger log = LoggerFactory.getLogger(getClass());
210
211 private final DeviceProviderServerProxy pairedProvider;
212 private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
213
214 private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;
215
216 // wrapped providerService
217 private DeviceProviderService deviceProviderService;
218
219
220 DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
221 StreamObserver<DeviceProviderMsg> toDeviceProvider) {
222 this.pairedProvider = provider;
223 this.toDeviceProvider = toDeviceProvider;
224 outstandingIsReachable = CacheBuilder.newBuilder()
225 .expireAfterWrite(1, TimeUnit.MINUTES)
226 .build();
227
228 // pair RPC session in other direction
229 provider.pair(this);
230 }
231
232 @Override
233 public void onNext(DeviceProviderServiceMsg msg) {
234 try {
235 log.trace("DeviceProviderServiceServerProxy received: {}", msg);
236 onMethod(msg);
237 } catch (Exception e) {
238 log.error("Exception thrown handling {}", msg, e);
239 onError(e);
240 throw e;
241 }
242 }
243
244 /**
245 * Translates received RPC message to {@link DeviceProviderService} method calls.
246 * @param msg DeviceProviderService message
247 */
248 private void onMethod(DeviceProviderServiceMsg msg) {
249 switch (msg.getMethodCase()) {
250 case REGISTER_PROVIDER:
251 RegisterProvider registerProvider = msg.getRegisterProvider();
252 // TODO Do we care about provider name?
253 pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
254 registeredProviders.add(pairedProvider);
HIGUCHI Yuta6381a242016-03-13 23:29:10 -0700255 log.info("registering DeviceProvider {} via gRPC", pairedProvider.id());
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800256 deviceProviderService = deviceProviderRegistry.register(pairedProvider);
257 break;
258
259 case DEVICE_CONNECTED:
260 DeviceConnected deviceConnected = msg.getDeviceConnected();
261 deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
262 translate(deviceConnected.getDeviceDescription()));
263 break;
264 case DEVICE_DISCONNECTED:
265 DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
266 deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
267 break;
268 case UPDATE_PORTS:
269 UpdatePorts updatePorts = msg.getUpdatePorts();
270 deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
271 updatePorts.getPortDescriptionsList()
272 .stream()
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -0700273 .map(ProtobufUtils::translate)
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800274 .collect(toList()));
275 break;
276 case PORT_STATUS_CHANGED:
277 PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
278 deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
279 translate(portStatusChanged.getPortDescription()));
280 break;
281 case RECEIVED_ROLE_REPLY:
282 ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
283 deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
284 translate(receivedRoleReply.getRequested()),
285 translate(receivedRoleReply.getResponse()));
286 break;
287 case UPDATE_PORT_STATISTICS:
288 UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
289 deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
290 updatePortStatistics.getPortStatisticsList()
291 .stream()
HIGUCHI Yuta06c1a3f2016-05-23 12:54:55 -0700292 .map(ProtobufUtils::translate)
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800293 .collect(toList()));
294 break;
295
296 // return value of DeviceProvider#isReachable
297 case IS_REACHABLE_RESPONSE:
298 IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
299 int xid = isReachableResponse.getXid();
300 boolean isReachable = isReachableResponse.getIsReachable();
301 CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
302 if (result != null) {
303 result.complete(isReachable);
304 }
305 break;
306
307 case METHOD_NOT_SET:
308 default:
309 log.warn("Unexpected message received {}", msg);
310 break;
311 }
312 }
313
314 @Override
315 public void onCompleted() {
316 log.info("DeviceProviderServiceServerProxy completed");
317 deviceProviderRegistry.unregister(pairedProvider);
318 registeredProviders.remove(pairedProvider);
319 toDeviceProvider.onCompleted();
320 }
321
322 @Override
323 public void onError(Throwable e) {
324 log.error("DeviceProviderServiceServerProxy#onError", e);
HIGUCHI Yuta6381a242016-03-13 23:29:10 -0700325 if (pairedProvider != null) {
326 // TODO call deviceDisconnected against all devices
327 // registered for this provider scheme
328 log.info("unregistering DeviceProvider {} via gRPC", pairedProvider.id());
329 deviceProviderRegistry.unregister(pairedProvider);
330 registeredProviders.remove(pairedProvider);
331 }
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800332 // TODO What is the proper clean up for bi-di stream on error?
333 // sample suggests no-op
334 toDeviceProvider.onError(e);
335 }
336
337
338 /**
339 * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
340 * @param xid IsReachable call ID.
341 * @param reply Future to
342 */
343 void register(int xid, CompletableFuture<Boolean> reply) {
344 outstandingIsReachable.put(xid, reply);
345 }
346
347 }
348
349 // Upper -> Lower Controller message
350 /**
351 * Relay DeviceProvider calls to RPC client.
352 */
353 private final class DeviceProviderServerProxy
354 implements DeviceProvider {
355
356 private final Logger log = LoggerFactory.getLogger(getClass());
357
358 // xid for isReachable calls
359 private final AtomicInteger xidPool = new AtomicInteger();
360 private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
361
362 private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
363 private ProviderId providerId;
364
365 DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
366 this.toDeviceProvider = toDeviceProvider;
367 }
368
369 void setProviderId(ProviderId pid) {
370 this.providerId = pid;
371 }
372
373 /**
374 * Registers RPC stream in other direction.
HIGUCHI Yuta6381a242016-03-13 23:29:10 -0700375 *
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800376 * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
377 */
378 void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
379 this.deviceProviderServiceProxy = deviceProviderServiceProxy;
380 }
381
382 @Override
383 public void triggerProbe(DeviceId deviceId) {
384 log.trace("triggerProbe({})", deviceId);
385 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
386 msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
387 .setDeviceId(deviceId.toString())
388 .build());
389 DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
390 toDeviceProvider.onNext(triggerProbeMsg);
391 // TODO Catch Exceptions and call onError()
392 }
393
394 @Override
395 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
396 log.trace("roleChanged({}, {})", deviceId, newRole);
397 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
398 msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
399 .setDeviceId(deviceId.toString())
400 .setNewRole(translate(newRole))
401 .build());
402 toDeviceProvider.onNext(msgBuilder.build());
403 // TODO Catch Exceptions and call onError()
404 }
405
406 @Override
407 public boolean isReachable(DeviceId deviceId) {
408 log.trace("isReachable({})", deviceId);
409 CompletableFuture<Boolean> result = new CompletableFuture<>();
410 final int xid = xidPool.incrementAndGet();
411
412 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
413 msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
414 .setXid(xid)
415 .setDeviceId(deviceId.toString())
416 .build());
417
418 // Associate xid and register above future some where
419 // in DeviceProviderService channel to receive reply
420 if (deviceProviderServiceProxy != null) {
421 deviceProviderServiceProxy.register(xid, result);
422 }
423
424 // send message down RPC
425 toDeviceProvider.onNext(msgBuilder.build());
426
427 // wait for reply
428 try {
429 return result.get(10, TimeUnit.SECONDS);
430 } catch (InterruptedException e) {
431 log.debug("isReachable({}) was Interrupted", deviceId, e);
432 Thread.currentThread().interrupt();
433 } catch (TimeoutException e) {
434 log.warn("isReachable({}) Timed out", deviceId, e);
435 } catch (ExecutionException e) {
436 log.error("isReachable({}) Execution failed", deviceId, e);
437 // close session?
438 }
439 return false;
440 // TODO Catch Exceptions and call onError()
441 }
442
443 @Override
444 public ProviderId id() {
445 return checkNotNull(providerId, "not initialized yet");
446 }
447
Saurav Dasa2d37502016-03-25 17:50:40 -0700448 @Override
449 public void changePortState(DeviceId deviceId, PortNumber portNumber,
450 boolean enable) {
451 // TODO if required
452
453 }
454
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800455 }
456}