ONOS-3323 gRPC implementation of Remote Service

- Start modelling Device related service (ONOS-3306)
- exclude machine generated code from doc

Change-Id: Idffbcd883f813de79c6f05fedc9475f308efcc31
diff --git a/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
new file mode 100644
index 0000000..4f43fa6
--- /dev/null
+++ b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
@@ -0,0 +1,385 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.rpc.grpc;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.stream.Collectors.toList;
+import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
+import static org.onosproject.net.DeviceId.deviceId;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.grpc.Device.DeviceConnected;
+import org.onosproject.grpc.Device.DeviceDisconnected;
+import org.onosproject.grpc.Device.DeviceProviderMsg;
+import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
+import org.onosproject.grpc.Device.IsReachableResponse;
+import org.onosproject.grpc.Device.PortStatusChanged;
+import org.onosproject.grpc.Device.ReceivedRoleReply;
+import org.onosproject.grpc.Device.RegisterProvider;
+import org.onosproject.grpc.Device.UpdatePortStatistics;
+import org.onosproject.grpc.Device.UpdatePorts;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
+import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.device.DeviceProvider;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.provider.ProviderId;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
+
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+import io.grpc.stub.StreamObserver;
+
+// gRPC Server on Metro-side
+// Translates request received on RPC channel, and calls corresponding Service on
+// Metro-ONOS cluster.
+/**
+ * Server side implementation of gRPC based RemoteService.
+ */
+@Component(immediate = true)
+public class GrpcRemoteServiceServer {
+
+    private static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
+
+    // TODO pick a number
+    public static final int DEFAULT_LISTEN_PORT = 11984;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceProviderRegistry deviceProviderRegistry;
+
+
+    @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
+            label = "Port to listen on")
+    protected int listenPort = DEFAULT_LISTEN_PORT;
+
+    private Server server;
+    private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
+
+    @Activate
+    protected void activate(ComponentContext context) throws IOException {
+        modified(context);
+
+        log.debug("Server starting on {}", listenPort);
+        try {
+            server  = NettyServerBuilder.forPort(listenPort)
+                    .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
+                    .build().start();
+        } catch (IOException e) {
+            log.error("Failed to start gRPC server", e);
+            throw e;
+        }
+
+        log.info("Started on {}", listenPort);
+    }
+
+    @Deactivate
+    protected void deactivate() {
+
+        registeredProviders.stream()
+            .forEach(deviceProviderRegistry::unregister);
+
+        server.shutdown();
+        // Should we wait for shutdown?
+        log.info("Stopped");
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        // TODO support dynamic reconfiguration and restarting server?
+    }
+
+    // RPC Server-side code
+    // RPC session Factory
+    /**
+     * Relays DeviceProviderRegistry calls from RPC client.
+     */
+    class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {
+
+        @Override
+        public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+            log.trace("DeviceProviderRegistryServerProxy#register called!");
+
+            DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);
+
+            return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
+        }
+    }
+
+    // Lower -> Upper Controller message
+    // RPC Server-side code
+    // RPC session handler
+    private final class DeviceProviderServiceServerProxy
+            implements StreamObserver<DeviceProviderServiceMsg> {
+
+        // intentionally shadowing
+        private final Logger log = LoggerFactory.getLogger(getClass());
+
+        private final DeviceProviderServerProxy pairedProvider;
+        private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
+
+        private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;
+
+        // wrapped providerService
+        private DeviceProviderService deviceProviderService;
+
+
+        DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
+                                         StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+            this.pairedProvider = provider;
+            this.toDeviceProvider = toDeviceProvider;
+            outstandingIsReachable = CacheBuilder.newBuilder()
+                        .expireAfterWrite(1, TimeUnit.MINUTES)
+                        .build();
+
+            // pair RPC session in other direction
+            provider.pair(this);
+        }
+
+        @Override
+        public void onNext(DeviceProviderServiceMsg msg) {
+            try {
+                log.trace("DeviceProviderServiceServerProxy received: {}", msg);
+                onMethod(msg);
+            } catch (Exception e) {
+                log.error("Exception thrown handling {}", msg, e);
+                onError(e);
+                throw e;
+            }
+        }
+
+        /**
+         * Translates received RPC message to {@link DeviceProviderService} method calls.
+         * @param msg DeviceProviderService message
+         */
+        private void onMethod(DeviceProviderServiceMsg msg) {
+            switch (msg.getMethodCase()) {
+            case REGISTER_PROVIDER:
+                RegisterProvider registerProvider = msg.getRegisterProvider();
+                // TODO Do we care about provider name?
+                pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
+                registeredProviders.add(pairedProvider);
+                deviceProviderService = deviceProviderRegistry.register(pairedProvider);
+                break;
+
+            case DEVICE_CONNECTED:
+                DeviceConnected deviceConnected = msg.getDeviceConnected();
+                deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
+                                                      translate(deviceConnected.getDeviceDescription()));
+                break;
+            case DEVICE_DISCONNECTED:
+                DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
+                deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
+                break;
+            case UPDATE_PORTS:
+                UpdatePorts updatePorts = msg.getUpdatePorts();
+                deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
+                                                  updatePorts.getPortDescriptionsList()
+                                                      .stream()
+                                                          .map(GrpcDeviceUtils::translate)
+                                                          .collect(toList()));
+                break;
+            case PORT_STATUS_CHANGED:
+                PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
+                deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
+                                                        translate(portStatusChanged.getPortDescription()));
+                break;
+            case RECEIVED_ROLE_REPLY:
+                ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
+                deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
+                                                        translate(receivedRoleReply.getRequested()),
+                                                        translate(receivedRoleReply.getResponse()));
+                break;
+            case UPDATE_PORT_STATISTICS:
+                UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
+                deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
+                                                           updatePortStatistics.getPortStatisticsList()
+                                                             .stream()
+                                                                .map(GrpcDeviceUtils::translate)
+                                                                .collect(toList()));
+                break;
+
+            // return value of DeviceProvider#isReachable
+            case IS_REACHABLE_RESPONSE:
+                IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
+                int xid = isReachableResponse.getXid();
+                boolean isReachable = isReachableResponse.getIsReachable();
+                CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
+                if (result != null) {
+                    result.complete(isReachable);
+                }
+                break;
+
+            case METHOD_NOT_SET:
+            default:
+                log.warn("Unexpected message received {}", msg);
+                break;
+            }
+        }
+
+        @Override
+        public void onCompleted() {
+            log.info("DeviceProviderServiceServerProxy completed");
+            deviceProviderRegistry.unregister(pairedProvider);
+            registeredProviders.remove(pairedProvider);
+            toDeviceProvider.onCompleted();
+        }
+
+        @Override
+        public void onError(Throwable e) {
+            log.error("DeviceProviderServiceServerProxy#onError", e);
+            deviceProviderRegistry.unregister(pairedProvider);
+            registeredProviders.remove(pairedProvider);
+            // TODO What is the proper clean up for bi-di stream on error?
+            // sample suggests no-op
+            toDeviceProvider.onError(e);
+        }
+
+
+        /**
+         * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
+         * @param xid   IsReachable call ID.
+         * @param reply Future to
+         */
+        void register(int xid, CompletableFuture<Boolean> reply) {
+            outstandingIsReachable.put(xid, reply);
+        }
+
+    }
+
+    // Upper -> Lower Controller message
+    /**
+     * Relay DeviceProvider calls to RPC client.
+     */
+    private final class DeviceProviderServerProxy
+            implements DeviceProvider {
+
+        private final Logger log = LoggerFactory.getLogger(getClass());
+
+        // xid for isReachable calls
+        private final AtomicInteger xidPool = new AtomicInteger();
+        private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
+
+        private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
+        private ProviderId providerId;
+
+        DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
+            this.toDeviceProvider = toDeviceProvider;
+        }
+
+        void setProviderId(ProviderId pid) {
+            this.providerId = pid;
+        }
+
+        /**
+         * Registers RPC stream in other direction.
+         * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
+         */
+        void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
+            this.deviceProviderServiceProxy  = deviceProviderServiceProxy;
+        }
+
+        @Override
+        public void triggerProbe(DeviceId deviceId) {
+            log.trace("triggerProbe({})", deviceId);
+            DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+            msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
+                                       .setDeviceId(deviceId.toString())
+                                       .build());
+            DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
+            toDeviceProvider.onNext(triggerProbeMsg);
+            // TODO Catch Exceptions and call onError()
+        }
+
+        @Override
+        public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
+            log.trace("roleChanged({}, {})", deviceId, newRole);
+            DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+            msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
+                                          .setDeviceId(deviceId.toString())
+                                          .setNewRole(translate(newRole))
+                                          .build());
+            toDeviceProvider.onNext(msgBuilder.build());
+            // TODO Catch Exceptions and call onError()
+        }
+
+        @Override
+        public boolean isReachable(DeviceId deviceId) {
+            log.trace("isReachable({})", deviceId);
+            CompletableFuture<Boolean> result = new CompletableFuture<>();
+            final int xid = xidPool.incrementAndGet();
+
+            DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
+            msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
+                                                 .setXid(xid)
+                                                 .setDeviceId(deviceId.toString())
+                                                 .build());
+
+            // Associate xid and register above future some where
+            // in DeviceProviderService channel to receive reply
+            if (deviceProviderServiceProxy != null) {
+                deviceProviderServiceProxy.register(xid, result);
+            }
+
+            // send message down RPC
+            toDeviceProvider.onNext(msgBuilder.build());
+
+            // wait for reply
+            try {
+                return result.get(10, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                log.debug("isReachable({}) was Interrupted", deviceId, e);
+                Thread.currentThread().interrupt();
+            } catch (TimeoutException e) {
+                log.warn("isReachable({}) Timed out", deviceId, e);
+            } catch (ExecutionException e) {
+                log.error("isReachable({}) Execution failed", deviceId, e);
+                // close session?
+            }
+            return false;
+            // TODO Catch Exceptions and call onError()
+        }
+
+        @Override
+        public ProviderId id() {
+            return checkNotNull(providerId, "not initialized yet");
+        }
+
+    }
+}