blob: f14b4ef4f1a0c193c4dd4ad675af51216da0966e [file] [log] [blame]
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.rpc.grpc;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static java.util.stream.Collectors.toList;
20import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
21import static org.onosproject.net.DeviceId.deviceId;
22
23import java.io.IOException;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080024import java.util.Map;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080025import java.util.Set;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.ExecutionException;
28import java.util.concurrent.TimeUnit;
29import java.util.concurrent.TimeoutException;
30import java.util.concurrent.atomic.AtomicInteger;
31
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Modified;
36import org.apache.felix.scr.annotations.Property;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.onosproject.grpc.Device.DeviceConnected;
40import org.onosproject.grpc.Device.DeviceDisconnected;
41import org.onosproject.grpc.Device.DeviceProviderMsg;
42import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
43import org.onosproject.grpc.Device.IsReachableResponse;
44import org.onosproject.grpc.Device.PortStatusChanged;
45import org.onosproject.grpc.Device.ReceivedRoleReply;
46import org.onosproject.grpc.Device.RegisterProvider;
47import org.onosproject.grpc.Device.UpdatePortStatistics;
48import org.onosproject.grpc.Device.UpdatePorts;
49import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
50import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080051import org.onosproject.grpc.LinkProviderServiceRpcGrpc;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080052import org.onosproject.net.DeviceId;
53import org.onosproject.net.MastershipRole;
Saurav Dasa2d37502016-03-25 17:50:40 -070054import org.onosproject.net.PortNumber;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080055import org.onosproject.net.device.DeviceProvider;
56import org.onosproject.net.device.DeviceProviderRegistry;
57import org.onosproject.net.device.DeviceProviderService;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080058import org.onosproject.net.link.LinkProvider;
59import org.onosproject.net.link.LinkProviderRegistry;
60import org.onosproject.net.link.LinkProviderService;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080061import org.onosproject.net.provider.ProviderId;
62import org.osgi.service.component.ComponentContext;
63import org.slf4j.Logger;
64import org.slf4j.LoggerFactory;
65
66import com.google.common.cache.Cache;
67import com.google.common.cache.CacheBuilder;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080068import com.google.common.collect.Maps;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080069import com.google.common.collect.Sets;
70
71import io.grpc.Server;
72import io.grpc.netty.NettyServerBuilder;
73import io.grpc.stub.StreamObserver;
74
75// gRPC Server on Metro-side
76// Translates request received on RPC channel, and calls corresponding Service on
77// Metro-ONOS cluster.
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080078
79// Currently supports DeviceProviderRegistry, LinkProviderService
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080080/**
81 * Server side implementation of gRPC based RemoteService.
82 */
83@Component(immediate = true)
84public class GrpcRemoteServiceServer {
85
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080086 static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080087
88 // TODO pick a number
89 public static final int DEFAULT_LISTEN_PORT = 11984;
90
91 private final Logger log = LoggerFactory.getLogger(getClass());
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected DeviceProviderRegistry deviceProviderRegistry;
95
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080096 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected LinkProviderRegistry linkProviderRegistry;
98
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080099
100 @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
101 label = "Port to listen on")
102 protected int listenPort = DEFAULT_LISTEN_PORT;
103
104 private Server server;
105 private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
106
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800107 // scheme -> ...
108 // updates must be guarded by synchronizing `this`
109 private final Map<String, LinkProviderService> linkProviderServices = Maps.newConcurrentMap();
110 private final Map<String, LinkProvider> linkProviders = Maps.newConcurrentMap();
111
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800112 @Activate
113 protected void activate(ComponentContext context) throws IOException {
114 modified(context);
115
116 log.debug("Server starting on {}", listenPort);
117 try {
118 server = NettyServerBuilder.forPort(listenPort)
119 .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800120 .addService(LinkProviderServiceRpcGrpc.bindService(new LinkProviderServiceServerProxy(this)))
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800121 .build().start();
122 } catch (IOException e) {
123 log.error("Failed to start gRPC server", e);
124 throw e;
125 }
126
127 log.info("Started on {}", listenPort);
128 }
129
130 @Deactivate
131 protected void deactivate() {
132
133 registeredProviders.stream()
134 .forEach(deviceProviderRegistry::unregister);
135
136 server.shutdown();
137 // Should we wait for shutdown?
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800138
139 unregisterLinkProviders();
140
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800141 log.info("Stopped");
142 }
143
144 @Modified
145 public void modified(ComponentContext context) {
146 // TODO support dynamic reconfiguration and restarting server?
147 }
148
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800149 /**
150 * Registers {@link StubLinkProvider} for given ProviderId scheme.
151 *
152 * DO NOT DIRECTLY CALL THIS METHOD.
153 * Only expected to be called from {@link #getLinkProviderServiceFor(String)}.
154 *
155 * @param scheme ProviderId scheme.
156 * @return {@link LinkProviderService} registered.
157 */
158 private synchronized LinkProviderService registerStubLinkProvider(String scheme) {
159 StubLinkProvider provider = new StubLinkProvider(scheme);
160 linkProviders.put(scheme, provider);
161 return linkProviderRegistry.register(provider);
162 }
163
164 /**
165 * Unregisters all registered LinkProviders.
166 */
167 private synchronized void unregisterLinkProviders() {
HIGUCHI Yuta6381a242016-03-13 23:29:10 -0700168 // TODO remove all links registered by these providers
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800169 linkProviders.values().forEach(linkProviderRegistry::unregister);
170 linkProviders.clear();
171 linkProviderServices.clear();
172 }
173
174 /**
175 * Gets or creates {@link LinkProviderService} registered for given ProviderId scheme.
176 *
177 * @param scheme ProviderId scheme.
178 * @return {@link LinkProviderService}
179 */
180 protected LinkProviderService getLinkProviderServiceFor(String scheme) {
181 return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider);
182 }
183
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800184 // RPC Server-side code
185 // RPC session Factory
186 /**
187 * Relays DeviceProviderRegistry calls from RPC client.
188 */
189 class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {
190
191 @Override
192 public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
193 log.trace("DeviceProviderRegistryServerProxy#register called!");
194
195 DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);
196
197 return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
198 }
199 }
200
201 // Lower -> Upper Controller message
202 // RPC Server-side code
203 // RPC session handler
204 private final class DeviceProviderServiceServerProxy
205 implements StreamObserver<DeviceProviderServiceMsg> {
206
207 // intentionally shadowing
208 private final Logger log = LoggerFactory.getLogger(getClass());
209
210 private final DeviceProviderServerProxy pairedProvider;
211 private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
212
213 private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;
214
215 // wrapped providerService
216 private DeviceProviderService deviceProviderService;
217
218
219 DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
220 StreamObserver<DeviceProviderMsg> toDeviceProvider) {
221 this.pairedProvider = provider;
222 this.toDeviceProvider = toDeviceProvider;
223 outstandingIsReachable = CacheBuilder.newBuilder()
224 .expireAfterWrite(1, TimeUnit.MINUTES)
225 .build();
226
227 // pair RPC session in other direction
228 provider.pair(this);
229 }
230
231 @Override
232 public void onNext(DeviceProviderServiceMsg msg) {
233 try {
234 log.trace("DeviceProviderServiceServerProxy received: {}", msg);
235 onMethod(msg);
236 } catch (Exception e) {
237 log.error("Exception thrown handling {}", msg, e);
238 onError(e);
239 throw e;
240 }
241 }
242
243 /**
244 * Translates received RPC message to {@link DeviceProviderService} method calls.
245 * @param msg DeviceProviderService message
246 */
247 private void onMethod(DeviceProviderServiceMsg msg) {
248 switch (msg.getMethodCase()) {
249 case REGISTER_PROVIDER:
250 RegisterProvider registerProvider = msg.getRegisterProvider();
251 // TODO Do we care about provider name?
252 pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
253 registeredProviders.add(pairedProvider);
HIGUCHI Yuta6381a242016-03-13 23:29:10 -0700254 log.info("registering DeviceProvider {} via gRPC", pairedProvider.id());
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800255 deviceProviderService = deviceProviderRegistry.register(pairedProvider);
256 break;
257
258 case DEVICE_CONNECTED:
259 DeviceConnected deviceConnected = msg.getDeviceConnected();
260 deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
261 translate(deviceConnected.getDeviceDescription()));
262 break;
263 case DEVICE_DISCONNECTED:
264 DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
265 deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
266 break;
267 case UPDATE_PORTS:
268 UpdatePorts updatePorts = msg.getUpdatePorts();
269 deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
270 updatePorts.getPortDescriptionsList()
271 .stream()
272 .map(GrpcDeviceUtils::translate)
273 .collect(toList()));
274 break;
275 case PORT_STATUS_CHANGED:
276 PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
277 deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
278 translate(portStatusChanged.getPortDescription()));
279 break;
280 case RECEIVED_ROLE_REPLY:
281 ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
282 deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
283 translate(receivedRoleReply.getRequested()),
284 translate(receivedRoleReply.getResponse()));
285 break;
286 case UPDATE_PORT_STATISTICS:
287 UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
288 deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
289 updatePortStatistics.getPortStatisticsList()
290 .stream()
291 .map(GrpcDeviceUtils::translate)
292 .collect(toList()));
293 break;
294
295 // return value of DeviceProvider#isReachable
296 case IS_REACHABLE_RESPONSE:
297 IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
298 int xid = isReachableResponse.getXid();
299 boolean isReachable = isReachableResponse.getIsReachable();
300 CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
301 if (result != null) {
302 result.complete(isReachable);
303 }
304 break;
305
306 case METHOD_NOT_SET:
307 default:
308 log.warn("Unexpected message received {}", msg);
309 break;
310 }
311 }
312
313 @Override
314 public void onCompleted() {
315 log.info("DeviceProviderServiceServerProxy completed");
316 deviceProviderRegistry.unregister(pairedProvider);
317 registeredProviders.remove(pairedProvider);
318 toDeviceProvider.onCompleted();
319 }
320
321 @Override
322 public void onError(Throwable e) {
323 log.error("DeviceProviderServiceServerProxy#onError", e);
HIGUCHI Yuta6381a242016-03-13 23:29:10 -0700324 if (pairedProvider != null) {
325 // TODO call deviceDisconnected against all devices
326 // registered for this provider scheme
327 log.info("unregistering DeviceProvider {} via gRPC", pairedProvider.id());
328 deviceProviderRegistry.unregister(pairedProvider);
329 registeredProviders.remove(pairedProvider);
330 }
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800331 // TODO What is the proper clean up for bi-di stream on error?
332 // sample suggests no-op
333 toDeviceProvider.onError(e);
334 }
335
336
337 /**
338 * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
339 * @param xid IsReachable call ID.
340 * @param reply Future to
341 */
342 void register(int xid, CompletableFuture<Boolean> reply) {
343 outstandingIsReachable.put(xid, reply);
344 }
345
346 }
347
348 // Upper -> Lower Controller message
349 /**
350 * Relay DeviceProvider calls to RPC client.
351 */
352 private final class DeviceProviderServerProxy
353 implements DeviceProvider {
354
355 private final Logger log = LoggerFactory.getLogger(getClass());
356
357 // xid for isReachable calls
358 private final AtomicInteger xidPool = new AtomicInteger();
359 private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
360
361 private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
362 private ProviderId providerId;
363
364 DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
365 this.toDeviceProvider = toDeviceProvider;
366 }
367
368 void setProviderId(ProviderId pid) {
369 this.providerId = pid;
370 }
371
372 /**
373 * Registers RPC stream in other direction.
HIGUCHI Yuta6381a242016-03-13 23:29:10 -0700374 *
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800375 * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
376 */
377 void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
378 this.deviceProviderServiceProxy = deviceProviderServiceProxy;
379 }
380
381 @Override
382 public void triggerProbe(DeviceId deviceId) {
383 log.trace("triggerProbe({})", deviceId);
384 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
385 msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
386 .setDeviceId(deviceId.toString())
387 .build());
388 DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
389 toDeviceProvider.onNext(triggerProbeMsg);
390 // TODO Catch Exceptions and call onError()
391 }
392
393 @Override
394 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
395 log.trace("roleChanged({}, {})", deviceId, newRole);
396 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
397 msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
398 .setDeviceId(deviceId.toString())
399 .setNewRole(translate(newRole))
400 .build());
401 toDeviceProvider.onNext(msgBuilder.build());
402 // TODO Catch Exceptions and call onError()
403 }
404
405 @Override
406 public boolean isReachable(DeviceId deviceId) {
407 log.trace("isReachable({})", deviceId);
408 CompletableFuture<Boolean> result = new CompletableFuture<>();
409 final int xid = xidPool.incrementAndGet();
410
411 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
412 msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
413 .setXid(xid)
414 .setDeviceId(deviceId.toString())
415 .build());
416
417 // Associate xid and register above future some where
418 // in DeviceProviderService channel to receive reply
419 if (deviceProviderServiceProxy != null) {
420 deviceProviderServiceProxy.register(xid, result);
421 }
422
423 // send message down RPC
424 toDeviceProvider.onNext(msgBuilder.build());
425
426 // wait for reply
427 try {
428 return result.get(10, TimeUnit.SECONDS);
429 } catch (InterruptedException e) {
430 log.debug("isReachable({}) was Interrupted", deviceId, e);
431 Thread.currentThread().interrupt();
432 } catch (TimeoutException e) {
433 log.warn("isReachable({}) Timed out", deviceId, e);
434 } catch (ExecutionException e) {
435 log.error("isReachable({}) Execution failed", deviceId, e);
436 // close session?
437 }
438 return false;
439 // TODO Catch Exceptions and call onError()
440 }
441
442 @Override
443 public ProviderId id() {
444 return checkNotNull(providerId, "not initialized yet");
445 }
446
Saurav Dasa2d37502016-03-25 17:50:40 -0700447 @Override
448 public void changePortState(DeviceId deviceId, PortNumber portNumber,
449 boolean enable) {
450 // TODO if required
451
452 }
453
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800454 }
455}