| /* |
| * Copyright 2015-present Open Networking Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.onosproject.incubator.rpc.grpc; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static java.util.concurrent.Executors.newScheduledThreadPool; |
| import static java.util.stream.Collectors.toList; |
| import org.onosproject.incubator.protobuf.models.net.device.DeviceProtoTranslator; |
| import org.onosproject.incubator.protobuf.models.net.device.PortProtoTranslator; |
| import org.onosproject.incubator.protobuf.models.net.MastershipRoleProtoTranslator; |
| import static org.onosproject.net.DeviceId.deviceId; |
| |
| import java.io.IOException; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.osgi.service.component.annotations.Activate; |
| import org.osgi.service.component.annotations.Component; |
| import org.osgi.service.component.annotations.Deactivate; |
| import org.osgi.service.component.annotations.Modified; |
| import org.osgi.service.component.annotations.Property; |
| import org.osgi.service.component.annotations.Reference; |
| import org.osgi.service.component.annotations.ReferenceCardinality; |
| import org.onlab.util.Tools; |
| import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpcImplBase; |
| import org.onosproject.grpc.net.device.DeviceService.DeviceConnected; |
| import org.onosproject.grpc.net.device.DeviceService.DeviceDisconnected; |
| import org.onosproject.grpc.net.device.DeviceService.DeviceProviderMsg; |
| import org.onosproject.grpc.net.device.DeviceService.DeviceProviderServiceMsg; |
| import org.onosproject.grpc.net.device.DeviceService.IsReachableResponse; |
| import org.onosproject.grpc.net.device.DeviceService.PortStatusChanged; |
| import org.onosproject.grpc.net.device.DeviceService.ReceivedRoleReply; |
| import org.onosproject.grpc.net.device.DeviceService.RegisterProvider; |
| import org.onosproject.grpc.net.device.DeviceService.UpdatePortStatistics; |
| import org.onosproject.grpc.net.device.DeviceService.UpdatePorts; |
| import org.onosproject.net.DeviceId; |
| import org.onosproject.net.MastershipRole; |
| import org.onosproject.net.PortNumber; |
| import org.onosproject.net.device.DeviceProvider; |
| import org.onosproject.net.device.DeviceProviderRegistry; |
| import org.onosproject.net.device.DeviceProviderService; |
| import org.onosproject.net.link.LinkProvider; |
| import org.onosproject.net.link.LinkProviderRegistry; |
| import org.onosproject.net.link.LinkProviderService; |
| import org.onosproject.net.provider.ProviderId; |
| import org.osgi.service.component.ComponentContext; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| import io.grpc.Server; |
| import io.grpc.netty.NettyServerBuilder; |
| import io.grpc.stub.StreamObserver; |
| |
| // gRPC Server on Metro-side |
| // Translates request received on RPC channel, and calls corresponding Service on |
| // Metro-ONOS cluster. |
| |
| // Currently supports DeviceProviderRegistry, LinkProviderService |
| /** |
| * Server side implementation of gRPC based RemoteService. |
| */ |
| @Component(immediate = true) |
| public class GrpcRemoteServiceServer { |
| |
| static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc"; |
| |
| // TODO pick a number |
| public static final int DEFAULT_LISTEN_PORT = 11984; |
| |
| private final Logger log = LoggerFactory.getLogger(getClass()); |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| protected DeviceProviderRegistry deviceProviderRegistry; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| protected LinkProviderRegistry linkProviderRegistry; |
| |
| /** Port to listen on */ |
| protected int listenPort = DEFAULT_LISTEN_PORT; |
| |
| private Server server; |
| private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet(); |
| |
| // scheme -> ... |
| // updates must be guarded by synchronizing `this` |
| private final Map<String, LinkProviderService> linkProviderServices = Maps.newConcurrentMap(); |
| private final Map<String, LinkProvider> linkProviders = Maps.newConcurrentMap(); |
| |
| private ScheduledExecutorService executor; |
| |
| @Activate |
| protected void activate(ComponentContext context) throws IOException { |
| executor = newScheduledThreadPool(1, Tools.groupedThreads("grpc", "%d", log)); |
| modified(context); |
| |
| log.debug("Server starting on {}", listenPort); |
| try { |
| server = NettyServerBuilder.forPort(listenPort) |
| .addService(new DeviceProviderRegistryServerProxy()) |
| .addService(new LinkProviderServiceServerProxy(this)) |
| .build().start(); |
| } catch (IOException e) { |
| log.error("Failed to start gRPC server", e); |
| throw e; |
| } |
| |
| log.info("Started on {}", listenPort); |
| } |
| |
| @Deactivate |
| protected void deactivate() { |
| executor.shutdown(); |
| try { |
| executor.awaitTermination(5, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| |
| registeredProviders.forEach(deviceProviderRegistry::unregister); |
| |
| server.shutdown(); |
| // Should we wait for shutdown? |
| |
| unregisterLinkProviders(); |
| |
| log.info("Stopped"); |
| } |
| |
| @Modified |
| public void modified(ComponentContext context) { |
| // TODO support dynamic reconfiguration and restarting server? |
| } |
| |
| /** |
| * Registers {@link StubLinkProvider} for given ProviderId scheme. |
| * |
| * DO NOT DIRECTLY CALL THIS METHOD. |
| * Only expected to be called from {@link #getLinkProviderServiceFor(String)}. |
| * |
| * @param scheme ProviderId scheme. |
| * @return {@link LinkProviderService} registered. |
| */ |
| private synchronized LinkProviderService registerStubLinkProvider(String scheme) { |
| StubLinkProvider provider = new StubLinkProvider(scheme); |
| linkProviders.put(scheme, provider); |
| return linkProviderRegistry.register(provider); |
| } |
| |
| /** |
| * Unregisters all registered LinkProviders. |
| */ |
| private synchronized void unregisterLinkProviders() { |
| // TODO remove all links registered by these providers |
| linkProviders.values().forEach(linkProviderRegistry::unregister); |
| linkProviders.clear(); |
| linkProviderServices.clear(); |
| } |
| |
| /** |
| * Gets or creates {@link LinkProviderService} registered for given ProviderId scheme. |
| * |
| * @param scheme ProviderId scheme. |
| * @return {@link LinkProviderService} |
| */ |
| protected LinkProviderService getLinkProviderServiceFor(String scheme) { |
| return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider); |
| } |
| |
| protected ScheduledExecutorService getSharedExecutor() { |
| return executor; |
| } |
| |
| // RPC Server-side code |
| // RPC session Factory |
| /** |
| * Relays DeviceProviderRegistry calls from RPC client. |
| */ |
| class DeviceProviderRegistryServerProxy extends DeviceProviderRegistryRpcImplBase { |
| |
| @Override |
| public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) { |
| log.trace("DeviceProviderRegistryServerProxy#register called!"); |
| |
| DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider); |
| |
| return new DeviceProviderServiceServerProxy(provider, toDeviceProvider); |
| } |
| } |
| |
| // Lower -> Upper Controller message |
| // RPC Server-side code |
| // RPC session handler |
| private final class DeviceProviderServiceServerProxy |
| implements StreamObserver<DeviceProviderServiceMsg> { |
| |
| // intentionally shadowing |
| private final Logger log = LoggerFactory.getLogger(getClass()); |
| |
| private final DeviceProviderServerProxy pairedProvider; |
| private final StreamObserver<DeviceProviderMsg> toDeviceProvider; |
| |
| private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable; |
| |
| // wrapped providerService |
| private DeviceProviderService deviceProviderService; |
| |
| |
| DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider, |
| StreamObserver<DeviceProviderMsg> toDeviceProvider) { |
| this.pairedProvider = provider; |
| this.toDeviceProvider = toDeviceProvider; |
| outstandingIsReachable = CacheBuilder.newBuilder() |
| .expireAfterWrite(1, TimeUnit.MINUTES) |
| .build(); |
| |
| // pair RPC session in other direction |
| provider.pair(this); |
| } |
| |
| @Override |
| public void onNext(DeviceProviderServiceMsg msg) { |
| try { |
| log.trace("DeviceProviderServiceServerProxy received: {}", msg); |
| onMethod(msg); |
| } catch (Exception e) { |
| log.error("Exception thrown handling {}", msg, e); |
| onError(e); |
| throw e; |
| } |
| } |
| |
| /** |
| * Translates received RPC message to {@link DeviceProviderService} method calls. |
| * @param msg DeviceProviderService message |
| */ |
| private void onMethod(DeviceProviderServiceMsg msg) { |
| switch (msg.getMethodCase()) { |
| case REGISTER_PROVIDER: |
| RegisterProvider registerProvider = msg.getRegisterProvider(); |
| // TODO Do we care about provider name? |
| pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME)); |
| registeredProviders.add(pairedProvider); |
| log.info("registering DeviceProvider {} via gRPC", pairedProvider.id()); |
| deviceProviderService = deviceProviderRegistry.register(pairedProvider); |
| break; |
| |
| case DEVICE_CONNECTED: |
| DeviceConnected deviceConnected = msg.getDeviceConnected(); |
| deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()), |
| DeviceProtoTranslator.translate(deviceConnected.getDeviceDescription())); |
| break; |
| case DEVICE_DISCONNECTED: |
| DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected(); |
| deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId())); |
| break; |
| case UPDATE_PORTS: |
| UpdatePorts updatePorts = msg.getUpdatePorts(); |
| deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()), |
| updatePorts.getPortDescriptionsList() |
| .stream() |
| .map(PortProtoTranslator::translate) |
| .collect(toList())); |
| break; |
| case PORT_STATUS_CHANGED: |
| PortStatusChanged portStatusChanged = msg.getPortStatusChanged(); |
| deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()), |
| PortProtoTranslator.translate(portStatusChanged.getPortDescription())); |
| break; |
| case RECEIVED_ROLE_REPLY: |
| ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply(); |
| deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()), |
| (MastershipRole) MastershipRoleProtoTranslator |
| .translate(receivedRoleReply.getRequested()).get(), |
| (MastershipRole) MastershipRoleProtoTranslator |
| .translate(receivedRoleReply.getResponse()).get()); |
| break; |
| case UPDATE_PORT_STATISTICS: |
| UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics(); |
| deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()), |
| updatePortStatistics.getPortStatisticsList() |
| .stream() |
| .map(PortProtoTranslator::translate) |
| .collect(toList())); |
| break; |
| |
| // return value of DeviceProvider#isReachable |
| case IS_REACHABLE_RESPONSE: |
| IsReachableResponse isReachableResponse = msg.getIsReachableResponse(); |
| int xid = isReachableResponse.getXid(); |
| boolean isReachable = isReachableResponse.getIsReachable(); |
| CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid); |
| if (result != null) { |
| result.complete(isReachable); |
| } |
| break; |
| |
| case METHOD_NOT_SET: |
| default: |
| log.warn("Unexpected message received {}", msg); |
| break; |
| } |
| } |
| |
| @Override |
| public void onCompleted() { |
| log.info("DeviceProviderServiceServerProxy completed"); |
| deviceProviderRegistry.unregister(pairedProvider); |
| registeredProviders.remove(pairedProvider); |
| toDeviceProvider.onCompleted(); |
| } |
| |
| @Override |
| public void onError(Throwable e) { |
| log.error("DeviceProviderServiceServerProxy#onError", e); |
| if (pairedProvider != null) { |
| // TODO call deviceDisconnected against all devices |
| // registered for this provider scheme |
| log.info("unregistering DeviceProvider {} via gRPC", pairedProvider.id()); |
| deviceProviderRegistry.unregister(pairedProvider); |
| registeredProviders.remove(pairedProvider); |
| } |
| // TODO What is the proper clean up for bi-di stream on error? |
| // sample suggests no-op |
| toDeviceProvider.onError(e); |
| } |
| |
| |
| /** |
| * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value. |
| * @param xid IsReachable call ID. |
| * @param reply Future to |
| */ |
| void register(int xid, CompletableFuture<Boolean> reply) { |
| outstandingIsReachable.put(xid, reply); |
| } |
| |
| } |
| |
| // Upper -> Lower Controller message |
| /** |
| * Relay DeviceProvider calls to RPC client. |
| */ |
| private final class DeviceProviderServerProxy |
| implements DeviceProvider { |
| |
| private final Logger log = LoggerFactory.getLogger(getClass()); |
| |
| // xid for isReachable calls |
| private final AtomicInteger xidPool = new AtomicInteger(); |
| private final StreamObserver<DeviceProviderMsg> toDeviceProvider; |
| |
| private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null; |
| private ProviderId providerId; |
| |
| DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) { |
| this.toDeviceProvider = toDeviceProvider; |
| } |
| |
| void setProviderId(ProviderId pid) { |
| this.providerId = pid; |
| } |
| |
| /** |
| * Registers RPC stream in other direction. |
| * |
| * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy} |
| */ |
| void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) { |
| this.deviceProviderServiceProxy = deviceProviderServiceProxy; |
| } |
| |
| @Override |
| public void triggerProbe(DeviceId deviceId) { |
| try { |
| onTriggerProbe(deviceId); |
| } catch (Exception e) { |
| log.error("Exception caught handling triggerProbe({})", |
| deviceId, e); |
| toDeviceProvider.onError(e); |
| } |
| } |
| |
| private void onTriggerProbe(DeviceId deviceId) { |
| log.trace("triggerProbe({})", deviceId); |
| DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); |
| msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder() |
| .setDeviceId(deviceId.toString()) |
| .build()); |
| DeviceProviderMsg triggerProbeMsg = msgBuilder.build(); |
| toDeviceProvider.onNext(triggerProbeMsg); |
| } |
| |
| @Override |
| public void roleChanged(DeviceId deviceId, MastershipRole newRole) { |
| try { |
| onRoleChanged(deviceId, newRole); |
| } catch (Exception e) { |
| log.error("Exception caught handling onRoleChanged({}, {})", |
| deviceId, newRole, e); |
| toDeviceProvider.onError(e); |
| } |
| } |
| |
| private void onRoleChanged(DeviceId deviceId, MastershipRole newRole) { |
| log.trace("roleChanged({}, {})", deviceId, newRole); |
| DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); |
| msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder() |
| .setDeviceId(deviceId.toString()) |
| .setNewRole(MastershipRoleProtoTranslator.translate(newRole)) |
| .build()); |
| toDeviceProvider.onNext(msgBuilder.build()); |
| } |
| |
| @Override |
| public boolean isReachable(DeviceId deviceId) { |
| try { |
| return onIsReachable(deviceId); |
| } catch (Exception e) { |
| log.error("Exception caught handling onIsReachable({})", |
| deviceId, e); |
| toDeviceProvider.onError(e); |
| return false; |
| } |
| } |
| |
| private boolean onIsReachable(DeviceId deviceId) { |
| |
| log.trace("isReachable({})", deviceId); |
| CompletableFuture<Boolean> result = new CompletableFuture<>(); |
| final int xid = xidPool.incrementAndGet(); |
| |
| DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder(); |
| msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder() |
| .setXid(xid) |
| .setDeviceId(deviceId.toString()) |
| .build()); |
| |
| // Associate xid and register above future some where |
| // in DeviceProviderService channel to receive reply |
| if (deviceProviderServiceProxy != null) { |
| deviceProviderServiceProxy.register(xid, result); |
| } |
| |
| // send message down RPC |
| toDeviceProvider.onNext(msgBuilder.build()); |
| |
| // wait for reply |
| try { |
| return result.get(10, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| log.debug("isReachable({}) was Interrupted", deviceId, e); |
| Thread.currentThread().interrupt(); |
| } catch (TimeoutException e) { |
| log.warn("isReachable({}) Timed out", deviceId, e); |
| } catch (ExecutionException e) { |
| log.error("isReachable({}) Execution failed", deviceId, e); |
| // close session |
| toDeviceProvider.onError(e); |
| } |
| return false; |
| } |
| |
| @Override |
| public ProviderId id() { |
| return checkNotNull(providerId, "not initialized yet"); |
| } |
| |
| @Override |
| public void changePortState(DeviceId deviceId, PortNumber portNumber, |
| boolean enable) { |
| // TODO Implement if required |
| log.error("changePortState not supported yet"); |
| toDeviceProvider.onError(new UnsupportedOperationException("not implemented yet")); |
| } |
| |
| } |
| } |