blob: 77d1cbe64369eeb07def6ba368a28b716e33992e [file] [log] [blame]
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.rpc.grpc;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static java.util.stream.Collectors.toList;
20import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
21import static org.onosproject.net.DeviceId.deviceId;
22
23import java.io.IOException;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080024import java.util.Map;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080025import java.util.Set;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.ExecutionException;
28import java.util.concurrent.TimeUnit;
29import java.util.concurrent.TimeoutException;
30import java.util.concurrent.atomic.AtomicInteger;
31
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Modified;
36import org.apache.felix.scr.annotations.Property;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.onosproject.grpc.Device.DeviceConnected;
40import org.onosproject.grpc.Device.DeviceDisconnected;
41import org.onosproject.grpc.Device.DeviceProviderMsg;
42import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
43import org.onosproject.grpc.Device.IsReachableResponse;
44import org.onosproject.grpc.Device.PortStatusChanged;
45import org.onosproject.grpc.Device.ReceivedRoleReply;
46import org.onosproject.grpc.Device.RegisterProvider;
47import org.onosproject.grpc.Device.UpdatePortStatistics;
48import org.onosproject.grpc.Device.UpdatePorts;
49import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
50import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080051import org.onosproject.grpc.LinkProviderServiceRpcGrpc;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080052import org.onosproject.net.DeviceId;
53import org.onosproject.net.MastershipRole;
Saurav Dasa2d37502016-03-25 17:50:40 -070054import org.onosproject.net.PortNumber;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080055import org.onosproject.net.device.DeviceProvider;
56import org.onosproject.net.device.DeviceProviderRegistry;
57import org.onosproject.net.device.DeviceProviderService;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080058import org.onosproject.net.link.LinkProvider;
59import org.onosproject.net.link.LinkProviderRegistry;
60import org.onosproject.net.link.LinkProviderService;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080061import org.onosproject.net.provider.ProviderId;
62import org.osgi.service.component.ComponentContext;
63import org.slf4j.Logger;
64import org.slf4j.LoggerFactory;
65
66import com.google.common.cache.Cache;
67import com.google.common.cache.CacheBuilder;
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080068import com.google.common.collect.Maps;
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080069import com.google.common.collect.Sets;
70
71import io.grpc.Server;
72import io.grpc.netty.NettyServerBuilder;
73import io.grpc.stub.StreamObserver;
74
75// gRPC Server on Metro-side
76// Translates request received on RPC channel, and calls corresponding Service on
77// Metro-ONOS cluster.
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080078
79// Currently supports DeviceProviderRegistry, LinkProviderService
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080080/**
81 * Server side implementation of gRPC based RemoteService.
82 */
83@Component(immediate = true)
84public class GrpcRemoteServiceServer {
85
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080086 static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080087
88 // TODO pick a number
89 public static final int DEFAULT_LISTEN_PORT = 11984;
90
91 private final Logger log = LoggerFactory.getLogger(getClass());
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected DeviceProviderRegistry deviceProviderRegistry;
95
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -080096 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected LinkProviderRegistry linkProviderRegistry;
98
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -080099
100 @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
101 label = "Port to listen on")
102 protected int listenPort = DEFAULT_LISTEN_PORT;
103
104 private Server server;
105 private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
106
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800107 // scheme -> ...
108 // updates must be guarded by synchronizing `this`
109 private final Map<String, LinkProviderService> linkProviderServices = Maps.newConcurrentMap();
110 private final Map<String, LinkProvider> linkProviders = Maps.newConcurrentMap();
111
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800112 @Activate
113 protected void activate(ComponentContext context) throws IOException {
114 modified(context);
115
116 log.debug("Server starting on {}", listenPort);
117 try {
118 server = NettyServerBuilder.forPort(listenPort)
119 .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800120 .addService(LinkProviderServiceRpcGrpc.bindService(new LinkProviderServiceServerProxy(this)))
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800121 .build().start();
122 } catch (IOException e) {
123 log.error("Failed to start gRPC server", e);
124 throw e;
125 }
126
127 log.info("Started on {}", listenPort);
128 }
129
130 @Deactivate
131 protected void deactivate() {
132
133 registeredProviders.stream()
134 .forEach(deviceProviderRegistry::unregister);
135
136 server.shutdown();
137 // Should we wait for shutdown?
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800138
139 unregisterLinkProviders();
140
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800141 log.info("Stopped");
142 }
143
144 @Modified
145 public void modified(ComponentContext context) {
146 // TODO support dynamic reconfiguration and restarting server?
147 }
148
HIGUCHI Yuta7c1583c2015-12-03 23:08:54 -0800149 /**
150 * Registers {@link StubLinkProvider} for given ProviderId scheme.
151 *
152 * DO NOT DIRECTLY CALL THIS METHOD.
153 * Only expected to be called from {@link #getLinkProviderServiceFor(String)}.
154 *
155 * @param scheme ProviderId scheme.
156 * @return {@link LinkProviderService} registered.
157 */
158 private synchronized LinkProviderService registerStubLinkProvider(String scheme) {
159 StubLinkProvider provider = new StubLinkProvider(scheme);
160 linkProviders.put(scheme, provider);
161 return linkProviderRegistry.register(provider);
162 }
163
164 /**
165 * Unregisters all registered LinkProviders.
166 */
167 private synchronized void unregisterLinkProviders() {
168 linkProviders.values().forEach(linkProviderRegistry::unregister);
169 linkProviders.clear();
170 linkProviderServices.clear();
171 }
172
173 /**
174 * Gets or creates {@link LinkProviderService} registered for given ProviderId scheme.
175 *
176 * @param scheme ProviderId scheme.
177 * @return {@link LinkProviderService}
178 */
179 protected LinkProviderService getLinkProviderServiceFor(String scheme) {
180 return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider);
181 }
182
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800183 // RPC Server-side code
184 // RPC session Factory
185 /**
186 * Relays DeviceProviderRegistry calls from RPC client.
187 */
188 class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {
189
190 @Override
191 public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
192 log.trace("DeviceProviderRegistryServerProxy#register called!");
193
194 DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);
195
196 return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
197 }
198 }
199
200 // Lower -> Upper Controller message
201 // RPC Server-side code
202 // RPC session handler
203 private final class DeviceProviderServiceServerProxy
204 implements StreamObserver<DeviceProviderServiceMsg> {
205
206 // intentionally shadowing
207 private final Logger log = LoggerFactory.getLogger(getClass());
208
209 private final DeviceProviderServerProxy pairedProvider;
210 private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
211
212 private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;
213
214 // wrapped providerService
215 private DeviceProviderService deviceProviderService;
216
217
218 DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
219 StreamObserver<DeviceProviderMsg> toDeviceProvider) {
220 this.pairedProvider = provider;
221 this.toDeviceProvider = toDeviceProvider;
222 outstandingIsReachable = CacheBuilder.newBuilder()
223 .expireAfterWrite(1, TimeUnit.MINUTES)
224 .build();
225
226 // pair RPC session in other direction
227 provider.pair(this);
228 }
229
230 @Override
231 public void onNext(DeviceProviderServiceMsg msg) {
232 try {
233 log.trace("DeviceProviderServiceServerProxy received: {}", msg);
234 onMethod(msg);
235 } catch (Exception e) {
236 log.error("Exception thrown handling {}", msg, e);
237 onError(e);
238 throw e;
239 }
240 }
241
242 /**
243 * Translates received RPC message to {@link DeviceProviderService} method calls.
244 * @param msg DeviceProviderService message
245 */
246 private void onMethod(DeviceProviderServiceMsg msg) {
247 switch (msg.getMethodCase()) {
248 case REGISTER_PROVIDER:
249 RegisterProvider registerProvider = msg.getRegisterProvider();
250 // TODO Do we care about provider name?
251 pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
252 registeredProviders.add(pairedProvider);
253 deviceProviderService = deviceProviderRegistry.register(pairedProvider);
254 break;
255
256 case DEVICE_CONNECTED:
257 DeviceConnected deviceConnected = msg.getDeviceConnected();
258 deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
259 translate(deviceConnected.getDeviceDescription()));
260 break;
261 case DEVICE_DISCONNECTED:
262 DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
263 deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
264 break;
265 case UPDATE_PORTS:
266 UpdatePorts updatePorts = msg.getUpdatePorts();
267 deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
268 updatePorts.getPortDescriptionsList()
269 .stream()
270 .map(GrpcDeviceUtils::translate)
271 .collect(toList()));
272 break;
273 case PORT_STATUS_CHANGED:
274 PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
275 deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
276 translate(portStatusChanged.getPortDescription()));
277 break;
278 case RECEIVED_ROLE_REPLY:
279 ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
280 deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
281 translate(receivedRoleReply.getRequested()),
282 translate(receivedRoleReply.getResponse()));
283 break;
284 case UPDATE_PORT_STATISTICS:
285 UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
286 deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
287 updatePortStatistics.getPortStatisticsList()
288 .stream()
289 .map(GrpcDeviceUtils::translate)
290 .collect(toList()));
291 break;
292
293 // return value of DeviceProvider#isReachable
294 case IS_REACHABLE_RESPONSE:
295 IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
296 int xid = isReachableResponse.getXid();
297 boolean isReachable = isReachableResponse.getIsReachable();
298 CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
299 if (result != null) {
300 result.complete(isReachable);
301 }
302 break;
303
304 case METHOD_NOT_SET:
305 default:
306 log.warn("Unexpected message received {}", msg);
307 break;
308 }
309 }
310
311 @Override
312 public void onCompleted() {
313 log.info("DeviceProviderServiceServerProxy completed");
314 deviceProviderRegistry.unregister(pairedProvider);
315 registeredProviders.remove(pairedProvider);
316 toDeviceProvider.onCompleted();
317 }
318
319 @Override
320 public void onError(Throwable e) {
321 log.error("DeviceProviderServiceServerProxy#onError", e);
322 deviceProviderRegistry.unregister(pairedProvider);
323 registeredProviders.remove(pairedProvider);
324 // TODO What is the proper clean up for bi-di stream on error?
325 // sample suggests no-op
326 toDeviceProvider.onError(e);
327 }
328
329
330 /**
331 * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
332 * @param xid IsReachable call ID.
333 * @param reply Future to
334 */
335 void register(int xid, CompletableFuture<Boolean> reply) {
336 outstandingIsReachable.put(xid, reply);
337 }
338
339 }
340
341 // Upper -> Lower Controller message
342 /**
343 * Relay DeviceProvider calls to RPC client.
344 */
345 private final class DeviceProviderServerProxy
346 implements DeviceProvider {
347
348 private final Logger log = LoggerFactory.getLogger(getClass());
349
350 // xid for isReachable calls
351 private final AtomicInteger xidPool = new AtomicInteger();
352 private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
353
354 private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
355 private ProviderId providerId;
356
357 DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
358 this.toDeviceProvider = toDeviceProvider;
359 }
360
361 void setProviderId(ProviderId pid) {
362 this.providerId = pid;
363 }
364
365 /**
366 * Registers RPC stream in other direction.
367 * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
368 */
369 void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
370 this.deviceProviderServiceProxy = deviceProviderServiceProxy;
371 }
372
373 @Override
374 public void triggerProbe(DeviceId deviceId) {
375 log.trace("triggerProbe({})", deviceId);
376 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
377 msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
378 .setDeviceId(deviceId.toString())
379 .build());
380 DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
381 toDeviceProvider.onNext(triggerProbeMsg);
382 // TODO Catch Exceptions and call onError()
383 }
384
385 @Override
386 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
387 log.trace("roleChanged({}, {})", deviceId, newRole);
388 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
389 msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
390 .setDeviceId(deviceId.toString())
391 .setNewRole(translate(newRole))
392 .build());
393 toDeviceProvider.onNext(msgBuilder.build());
394 // TODO Catch Exceptions and call onError()
395 }
396
397 @Override
398 public boolean isReachable(DeviceId deviceId) {
399 log.trace("isReachable({})", deviceId);
400 CompletableFuture<Boolean> result = new CompletableFuture<>();
401 final int xid = xidPool.incrementAndGet();
402
403 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
404 msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
405 .setXid(xid)
406 .setDeviceId(deviceId.toString())
407 .build());
408
409 // Associate xid and register above future some where
410 // in DeviceProviderService channel to receive reply
411 if (deviceProviderServiceProxy != null) {
412 deviceProviderServiceProxy.register(xid, result);
413 }
414
415 // send message down RPC
416 toDeviceProvider.onNext(msgBuilder.build());
417
418 // wait for reply
419 try {
420 return result.get(10, TimeUnit.SECONDS);
421 } catch (InterruptedException e) {
422 log.debug("isReachable({}) was Interrupted", deviceId, e);
423 Thread.currentThread().interrupt();
424 } catch (TimeoutException e) {
425 log.warn("isReachable({}) Timed out", deviceId, e);
426 } catch (ExecutionException e) {
427 log.error("isReachable({}) Execution failed", deviceId, e);
428 // close session?
429 }
430 return false;
431 // TODO Catch Exceptions and call onError()
432 }
433
434 @Override
435 public ProviderId id() {
436 return checkNotNull(providerId, "not initialized yet");
437 }
438
Saurav Dasa2d37502016-03-25 17:50:40 -0700439 @Override
440 public void changePortState(DeviceId deviceId, PortNumber portNumber,
441 boolean enable) {
442 // TODO if required
443
444 }
445
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -0800446 }
447}