blob: a13b29c6f3b7c98691e87c18233e6e7d161320c1 [file] [log] [blame]
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.rpc.grpc;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static java.util.stream.Collectors.toList;
20import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
21import static org.onosproject.net.DeviceId.deviceId;
22
23import java.io.IOException;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080024import java.util.Map;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080025import java.util.Set;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.ExecutionException;
28import java.util.concurrent.TimeUnit;
29import java.util.concurrent.TimeoutException;
30import java.util.concurrent.atomic.AtomicInteger;
31
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Modified;
36import org.apache.felix.scr.annotations.Property;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.onosproject.grpc.Device.DeviceConnected;
40import org.onosproject.grpc.Device.DeviceDisconnected;
41import org.onosproject.grpc.Device.DeviceProviderMsg;
42import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
43import org.onosproject.grpc.Device.IsReachableResponse;
44import org.onosproject.grpc.Device.PortStatusChanged;
45import org.onosproject.grpc.Device.ReceivedRoleReply;
46import org.onosproject.grpc.Device.RegisterProvider;
47import org.onosproject.grpc.Device.UpdatePortStatistics;
48import org.onosproject.grpc.Device.UpdatePorts;
49import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
50import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080051import org.onosproject.grpc.LinkProviderServiceRpcGrpc;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080052import org.onosproject.net.DeviceId;
53import org.onosproject.net.MastershipRole;
54import org.onosproject.net.device.DeviceProvider;
55import org.onosproject.net.device.DeviceProviderRegistry;
56import org.onosproject.net.device.DeviceProviderService;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080057import org.onosproject.net.link.LinkProvider;
58import org.onosproject.net.link.LinkProviderRegistry;
59import org.onosproject.net.link.LinkProviderService;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080060import org.onosproject.net.provider.ProviderId;
61import org.osgi.service.component.ComponentContext;
62import org.slf4j.Logger;
63import org.slf4j.LoggerFactory;
64
65import com.google.common.cache.Cache;
66import com.google.common.cache.CacheBuilder;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080067import com.google.common.collect.Maps;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080068import com.google.common.collect.Sets;
69
70import io.grpc.Server;
71import io.grpc.netty.NettyServerBuilder;
72import io.grpc.stub.StreamObserver;
73
74// gRPC Server on Metro-side
75// Translates request received on RPC channel, and calls corresponding Service on
76// Metro-ONOS cluster.
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080077
78// Currently supports DeviceProviderRegistry, LinkProviderService
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080079/**
80 * Server side implementation of gRPC based RemoteService.
81 */
82@Component(immediate = true)
83public class GrpcRemoteServiceServer {
84
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080085 static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080086
87 // TODO pick a number
88 public static final int DEFAULT_LISTEN_PORT = 11984;
89
90 private final Logger log = LoggerFactory.getLogger(getClass());
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected DeviceProviderRegistry deviceProviderRegistry;
94
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080095 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected LinkProviderRegistry linkProviderRegistry;
97
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080098
99 @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
100 label = "Port to listen on")
101 protected int listenPort = DEFAULT_LISTEN_PORT;
102
103 private Server server;
104 private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
105
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800106 // scheme -> ...
107 // updates must be guarded by synchronizing `this`
108 private final Map<String, LinkProviderService> linkProviderServices = Maps.newConcurrentMap();
109 private final Map<String, LinkProvider> linkProviders = Maps.newConcurrentMap();
110
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800111 @Activate
112 protected void activate(ComponentContext context) throws IOException {
113 modified(context);
114
115 log.debug("Server starting on {}", listenPort);
116 try {
117 server = NettyServerBuilder.forPort(listenPort)
118 .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800119 .addService(LinkProviderServiceRpcGrpc.bindService(new LinkProviderServiceServerProxy(this)))
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800120 .build().start();
121 } catch (IOException e) {
122 log.error("Failed to start gRPC server", e);
123 throw e;
124 }
125
126 log.info("Started on {}", listenPort);
127 }
128
129 @Deactivate
130 protected void deactivate() {
131
132 registeredProviders.stream()
133 .forEach(deviceProviderRegistry::unregister);
134
135 server.shutdown();
136 // Should we wait for shutdown?
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800137
138 unregisterLinkProviders();
139
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800140 log.info("Stopped");
141 }
142
143 @Modified
144 public void modified(ComponentContext context) {
145 // TODO support dynamic reconfiguration and restarting server?
146 }
147
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800148 /**
149 * Registers {@link StubLinkProvider} for given ProviderId scheme.
150 *
151 * DO NOT DIRECTLY CALL THIS METHOD.
152 * Only expected to be called from {@link #getLinkProviderServiceFor(String)}.
153 *
154 * @param scheme ProviderId scheme.
155 * @return {@link LinkProviderService} registered.
156 */
157 private synchronized LinkProviderService registerStubLinkProvider(String scheme) {
158 StubLinkProvider provider = new StubLinkProvider(scheme);
159 linkProviders.put(scheme, provider);
160 return linkProviderRegistry.register(provider);
161 }
162
163 /**
164 * Unregisters all registered LinkProviders.
165 */
166 private synchronized void unregisterLinkProviders() {
167 linkProviders.values().forEach(linkProviderRegistry::unregister);
168 linkProviders.clear();
169 linkProviderServices.clear();
170 }
171
172 /**
173 * Gets or creates {@link LinkProviderService} registered for given ProviderId scheme.
174 *
175 * @param scheme ProviderId scheme.
176 * @return {@link LinkProviderService}
177 */
178 protected LinkProviderService getLinkProviderServiceFor(String scheme) {
179 return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider);
180 }
181
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800182 // RPC Server-side code
183 // RPC session Factory
184 /**
185 * Relays DeviceProviderRegistry calls from RPC client.
186 */
187 class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {
188
189 @Override
190 public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
191 log.trace("DeviceProviderRegistryServerProxy#register called!");
192
193 DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);
194
195 return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
196 }
197 }
198
199 // Lower -> Upper Controller message
200 // RPC Server-side code
201 // RPC session handler
202 private final class DeviceProviderServiceServerProxy
203 implements StreamObserver<DeviceProviderServiceMsg> {
204
205 // intentionally shadowing
206 private final Logger log = LoggerFactory.getLogger(getClass());
207
208 private final DeviceProviderServerProxy pairedProvider;
209 private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
210
211 private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;
212
213 // wrapped providerService
214 private DeviceProviderService deviceProviderService;
215
216
217 DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
218 StreamObserver<DeviceProviderMsg> toDeviceProvider) {
219 this.pairedProvider = provider;
220 this.toDeviceProvider = toDeviceProvider;
221 outstandingIsReachable = CacheBuilder.newBuilder()
222 .expireAfterWrite(1, TimeUnit.MINUTES)
223 .build();
224
225 // pair RPC session in other direction
226 provider.pair(this);
227 }
228
229 @Override
230 public void onNext(DeviceProviderServiceMsg msg) {
231 try {
232 log.trace("DeviceProviderServiceServerProxy received: {}", msg);
233 onMethod(msg);
234 } catch (Exception e) {
235 log.error("Exception thrown handling {}", msg, e);
236 onError(e);
237 throw e;
238 }
239 }
240
241 /**
242 * Translates received RPC message to {@link DeviceProviderService} method calls.
243 * @param msg DeviceProviderService message
244 */
245 private void onMethod(DeviceProviderServiceMsg msg) {
246 switch (msg.getMethodCase()) {
247 case REGISTER_PROVIDER:
248 RegisterProvider registerProvider = msg.getRegisterProvider();
249 // TODO Do we care about provider name?
250 pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
251 registeredProviders.add(pairedProvider);
252 deviceProviderService = deviceProviderRegistry.register(pairedProvider);
253 break;
254
255 case DEVICE_CONNECTED:
256 DeviceConnected deviceConnected = msg.getDeviceConnected();
257 deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
258 translate(deviceConnected.getDeviceDescription()));
259 break;
260 case DEVICE_DISCONNECTED:
261 DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
262 deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
263 break;
264 case UPDATE_PORTS:
265 UpdatePorts updatePorts = msg.getUpdatePorts();
266 deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
267 updatePorts.getPortDescriptionsList()
268 .stream()
269 .map(GrpcDeviceUtils::translate)
270 .collect(toList()));
271 break;
272 case PORT_STATUS_CHANGED:
273 PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
274 deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
275 translate(portStatusChanged.getPortDescription()));
276 break;
277 case RECEIVED_ROLE_REPLY:
278 ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
279 deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
280 translate(receivedRoleReply.getRequested()),
281 translate(receivedRoleReply.getResponse()));
282 break;
283 case UPDATE_PORT_STATISTICS:
284 UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
285 deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
286 updatePortStatistics.getPortStatisticsList()
287 .stream()
288 .map(GrpcDeviceUtils::translate)
289 .collect(toList()));
290 break;
291
292 // return value of DeviceProvider#isReachable
293 case IS_REACHABLE_RESPONSE:
294 IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
295 int xid = isReachableResponse.getXid();
296 boolean isReachable = isReachableResponse.getIsReachable();
297 CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
298 if (result != null) {
299 result.complete(isReachable);
300 }
301 break;
302
303 case METHOD_NOT_SET:
304 default:
305 log.warn("Unexpected message received {}", msg);
306 break;
307 }
308 }
309
310 @Override
311 public void onCompleted() {
312 log.info("DeviceProviderServiceServerProxy completed");
313 deviceProviderRegistry.unregister(pairedProvider);
314 registeredProviders.remove(pairedProvider);
315 toDeviceProvider.onCompleted();
316 }
317
318 @Override
319 public void onError(Throwable e) {
320 log.error("DeviceProviderServiceServerProxy#onError", e);
321 deviceProviderRegistry.unregister(pairedProvider);
322 registeredProviders.remove(pairedProvider);
323 // TODO What is the proper clean up for bi-di stream on error?
324 // sample suggests no-op
325 toDeviceProvider.onError(e);
326 }
327
328
329 /**
330 * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
331 * @param xid IsReachable call ID.
332 * @param reply Future to
333 */
334 void register(int xid, CompletableFuture<Boolean> reply) {
335 outstandingIsReachable.put(xid, reply);
336 }
337
338 }
339
340 // Upper -> Lower Controller message
341 /**
342 * Relay DeviceProvider calls to RPC client.
343 */
344 private final class DeviceProviderServerProxy
345 implements DeviceProvider {
346
347 private final Logger log = LoggerFactory.getLogger(getClass());
348
349 // xid for isReachable calls
350 private final AtomicInteger xidPool = new AtomicInteger();
351 private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
352
353 private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
354 private ProviderId providerId;
355
356 DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
357 this.toDeviceProvider = toDeviceProvider;
358 }
359
360 void setProviderId(ProviderId pid) {
361 this.providerId = pid;
362 }
363
364 /**
365 * Registers RPC stream in other direction.
366 * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
367 */
368 void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
369 this.deviceProviderServiceProxy = deviceProviderServiceProxy;
370 }
371
372 @Override
373 public void triggerProbe(DeviceId deviceId) {
374 log.trace("triggerProbe({})", deviceId);
375 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
376 msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
377 .setDeviceId(deviceId.toString())
378 .build());
379 DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
380 toDeviceProvider.onNext(triggerProbeMsg);
381 // TODO Catch Exceptions and call onError()
382 }
383
384 @Override
385 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
386 log.trace("roleChanged({}, {})", deviceId, newRole);
387 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
388 msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
389 .setDeviceId(deviceId.toString())
390 .setNewRole(translate(newRole))
391 .build());
392 toDeviceProvider.onNext(msgBuilder.build());
393 // TODO Catch Exceptions and call onError()
394 }
395
396 @Override
397 public boolean isReachable(DeviceId deviceId) {
398 log.trace("isReachable({})", deviceId);
399 CompletableFuture<Boolean> result = new CompletableFuture<>();
400 final int xid = xidPool.incrementAndGet();
401
402 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
403 msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
404 .setXid(xid)
405 .setDeviceId(deviceId.toString())
406 .build());
407
408 // Associate xid and register above future some where
409 // in DeviceProviderService channel to receive reply
410 if (deviceProviderServiceProxy != null) {
411 deviceProviderServiceProxy.register(xid, result);
412 }
413
414 // send message down RPC
415 toDeviceProvider.onNext(msgBuilder.build());
416
417 // wait for reply
418 try {
419 return result.get(10, TimeUnit.SECONDS);
420 } catch (InterruptedException e) {
421 log.debug("isReachable({}) was Interrupted", deviceId, e);
422 Thread.currentThread().interrupt();
423 } catch (TimeoutException e) {
424 log.warn("isReachable({}) Timed out", deviceId, e);
425 } catch (ExecutionException e) {
426 log.error("isReachable({}) Execution failed", deviceId, e);
427 // close session?
428 }
429 return false;
430 // TODO Catch Exceptions and call onError()
431 }
432
433 @Override
434 public ProviderId id() {
435 return checkNotNull(providerId, "not initialized yet");
436 }
437
438 }
439}