/*
 * Copyright 2015-present Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.incubator.rpc.grpc;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.stream.Collectors.toList;
import static org.onosproject.incubator.protobuf.net.ProtobufUtils.translate;
import static org.onosproject.net.DeviceId.deviceId;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.util.Tools;
import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc;
import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
import org.onosproject.grpc.net.device.DeviceService.DeviceConnected;
import org.onosproject.grpc.net.device.DeviceService.DeviceDisconnected;
import org.onosproject.grpc.net.device.DeviceService.DeviceProviderMsg;
import org.onosproject.grpc.net.device.DeviceService.DeviceProviderServiceMsg;
import org.onosproject.grpc.net.device.DeviceService.IsReachableResponse;
import org.onosproject.grpc.net.device.DeviceService.PortStatusChanged;
import org.onosproject.grpc.net.device.DeviceService.ReceivedRoleReply;
import org.onosproject.grpc.net.device.DeviceService.RegisterProvider;
import org.onosproject.grpc.net.device.DeviceService.UpdatePortStatistics;
import org.onosproject.grpc.net.device.DeviceService.UpdatePorts;
import org.onosproject.grpc.net.link.LinkProviderServiceRpcGrpc;
import org.onosproject.incubator.protobuf.net.ProtobufUtils;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.link.LinkProvider;
import org.onosproject.net.link.LinkProviderRegistry;
import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.provider.ProviderId;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;

// gRPC Server on Metro-side
// Translates request received on RPC channel, and calls corresponding Service on
// Metro-ONOS cluster.

// Currently supports DeviceProviderRegistry, LinkProviderService
/**
 * Server side implementation of gRPC based RemoteService.
 */
@Component(immediate = true)
public class GrpcRemoteServiceServer {

    static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";

    // TODO pick a number
    public static final int DEFAULT_LISTEN_PORT = 11984;

    private final Logger log = LoggerFactory.getLogger(getClass());

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected DeviceProviderRegistry deviceProviderRegistry;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected LinkProviderRegistry linkProviderRegistry;


    @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
            label = "Port to listen on")
    protected int listenPort = DEFAULT_LISTEN_PORT;

    private Server server;
    private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();

    // scheme -> ...
    // updates must be guarded by synchronizing `this`
    private final Map<String, LinkProviderService> linkProviderServices = Maps.newConcurrentMap();
    private final Map<String, LinkProvider> linkProviders = Maps.newConcurrentMap();

    private ScheduledExecutorService executor;

    @Activate
    protected void activate(ComponentContext context) throws IOException {
        executor = newScheduledThreadPool(1, Tools.groupedThreads("grpc", "%d", log));
        modified(context);

        log.debug("Server starting on {}", listenPort);
        try {
            server  = NettyServerBuilder.forPort(listenPort)
                    .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
                    .addService(LinkProviderServiceRpcGrpc.bindService(new LinkProviderServiceServerProxy(this)))
                    .build().start();
        } catch (IOException e) {
            log.error("Failed to start gRPC server", e);
            throw e;
        }

        log.info("Started on {}", listenPort);
    }

    @Deactivate
    protected void deactivate() {
        executor.shutdown();
        try {
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        registeredProviders.stream()
            .forEach(deviceProviderRegistry::unregister);

        server.shutdown();
        // Should we wait for shutdown?

        unregisterLinkProviders();

        log.info("Stopped");
    }

    @Modified
    public void modified(ComponentContext context) {
        // TODO support dynamic reconfiguration and restarting server?
    }

    /**
     * Registers {@link StubLinkProvider} for given ProviderId scheme.
     *
     * DO NOT DIRECTLY CALL THIS METHOD.
     * Only expected to be called from {@link #getLinkProviderServiceFor(String)}.
     *
     * @param scheme ProviderId scheme.
     * @return {@link LinkProviderService} registered.
     */
    private synchronized LinkProviderService registerStubLinkProvider(String scheme) {
        StubLinkProvider provider = new StubLinkProvider(scheme);
        linkProviders.put(scheme, provider);
        return linkProviderRegistry.register(provider);
    }

    /**
     * Unregisters all registered LinkProviders.
     */
    private synchronized void unregisterLinkProviders() {
        // TODO remove all links registered by these providers
        linkProviders.values().forEach(linkProviderRegistry::unregister);
        linkProviders.clear();
        linkProviderServices.clear();
    }

    /**
     * Gets or creates {@link LinkProviderService} registered for given ProviderId scheme.
     *
     * @param scheme ProviderId scheme.
     * @return {@link LinkProviderService}
     */
    protected LinkProviderService getLinkProviderServiceFor(String scheme) {
        return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider);
    }

    protected ScheduledExecutorService getSharedExecutor() {
        return executor;
    }

    // RPC Server-side code
    // RPC session Factory
    /**
     * Relays DeviceProviderRegistry calls from RPC client.
     */
    class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {

        @Override
        public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
            log.trace("DeviceProviderRegistryServerProxy#register called!");

            DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);

            return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
        }
    }

    // Lower -> Upper Controller message
    // RPC Server-side code
    // RPC session handler
    private final class DeviceProviderServiceServerProxy
            implements StreamObserver<DeviceProviderServiceMsg> {

        // intentionally shadowing
        private final Logger log = LoggerFactory.getLogger(getClass());

        private final DeviceProviderServerProxy pairedProvider;
        private final StreamObserver<DeviceProviderMsg> toDeviceProvider;

        private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;

        // wrapped providerService
        private DeviceProviderService deviceProviderService;


        DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
                                         StreamObserver<DeviceProviderMsg> toDeviceProvider) {
            this.pairedProvider = provider;
            this.toDeviceProvider = toDeviceProvider;
            outstandingIsReachable = CacheBuilder.newBuilder()
                        .expireAfterWrite(1, TimeUnit.MINUTES)
                        .build();

            // pair RPC session in other direction
            provider.pair(this);
        }

        @Override
        public void onNext(DeviceProviderServiceMsg msg) {
            try {
                log.trace("DeviceProviderServiceServerProxy received: {}", msg);
                onMethod(msg);
            } catch (Exception e) {
                log.error("Exception thrown handling {}", msg, e);
                onError(e);
                throw e;
            }
        }

        /**
         * Translates received RPC message to {@link DeviceProviderService} method calls.
         * @param msg DeviceProviderService message
         */
        private void onMethod(DeviceProviderServiceMsg msg) {
            switch (msg.getMethodCase()) {
            case REGISTER_PROVIDER:
                RegisterProvider registerProvider = msg.getRegisterProvider();
                // TODO Do we care about provider name?
                pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
                registeredProviders.add(pairedProvider);
                log.info("registering DeviceProvider {} via gRPC", pairedProvider.id());
                deviceProviderService = deviceProviderRegistry.register(pairedProvider);
                break;

            case DEVICE_CONNECTED:
                DeviceConnected deviceConnected = msg.getDeviceConnected();
                deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
                                                      translate(deviceConnected.getDeviceDescription()));
                break;
            case DEVICE_DISCONNECTED:
                DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
                deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
                break;
            case UPDATE_PORTS:
                UpdatePorts updatePorts = msg.getUpdatePorts();
                deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
                                                  updatePorts.getPortDescriptionsList()
                                                      .stream()
                                                          .map(ProtobufUtils::translate)
                                                          .collect(toList()));
                break;
            case PORT_STATUS_CHANGED:
                PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
                deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
                                                        translate(portStatusChanged.getPortDescription()));
                break;
            case RECEIVED_ROLE_REPLY:
                ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
                deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
                                                        translate(receivedRoleReply.getRequested()),
                                                        translate(receivedRoleReply.getResponse()));
                break;
            case UPDATE_PORT_STATISTICS:
                UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
                deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
                                                           updatePortStatistics.getPortStatisticsList()
                                                             .stream()
                                                                .map(ProtobufUtils::translate)
                                                                .collect(toList()));
                break;

            // return value of DeviceProvider#isReachable
            case IS_REACHABLE_RESPONSE:
                IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
                int xid = isReachableResponse.getXid();
                boolean isReachable = isReachableResponse.getIsReachable();
                CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
                if (result != null) {
                    result.complete(isReachable);
                }
                break;

            case METHOD_NOT_SET:
            default:
                log.warn("Unexpected message received {}", msg);
                break;
            }
        }

        @Override
        public void onCompleted() {
            log.info("DeviceProviderServiceServerProxy completed");
            deviceProviderRegistry.unregister(pairedProvider);
            registeredProviders.remove(pairedProvider);
            toDeviceProvider.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            log.error("DeviceProviderServiceServerProxy#onError", e);
            if (pairedProvider != null) {
                // TODO call deviceDisconnected against all devices
                // registered for this provider scheme
                log.info("unregistering DeviceProvider {} via gRPC", pairedProvider.id());
                deviceProviderRegistry.unregister(pairedProvider);
                registeredProviders.remove(pairedProvider);
            }
            // TODO What is the proper clean up for bi-di stream on error?
            // sample suggests no-op
            toDeviceProvider.onError(e);
        }


        /**
         * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
         * @param xid   IsReachable call ID.
         * @param reply Future to
         */
        void register(int xid, CompletableFuture<Boolean> reply) {
            outstandingIsReachable.put(xid, reply);
        }

    }

    // Upper -> Lower Controller message
    /**
     * Relay DeviceProvider calls to RPC client.
     */
    private final class DeviceProviderServerProxy
            implements DeviceProvider {

        private final Logger log = LoggerFactory.getLogger(getClass());

        // xid for isReachable calls
        private final AtomicInteger xidPool = new AtomicInteger();
        private final StreamObserver<DeviceProviderMsg> toDeviceProvider;

        private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
        private ProviderId providerId;

        DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
            this.toDeviceProvider = toDeviceProvider;
        }

        void setProviderId(ProviderId pid) {
            this.providerId = pid;
        }

        /**
         * Registers RPC stream in other direction.
         *
         * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
         */
        void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
            this.deviceProviderServiceProxy  = deviceProviderServiceProxy;
        }

        @Override
        public void triggerProbe(DeviceId deviceId) {
            try {
                onTriggerProbe(deviceId);
            } catch (Exception e) {
                log.error("Exception caught handling triggerProbe({})",
                          deviceId, e);
                toDeviceProvider.onError(e);
            }
        }

        private void onTriggerProbe(DeviceId deviceId) {
            log.trace("triggerProbe({})", deviceId);
            DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
            msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
                                       .setDeviceId(deviceId.toString())
                                       .build());
            DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
            toDeviceProvider.onNext(triggerProbeMsg);
        }

        @Override
        public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
            try {
                onRoleChanged(deviceId, newRole);
            } catch (Exception e) {
                log.error("Exception caught handling onRoleChanged({}, {})",
                          deviceId, newRole, e);
                toDeviceProvider.onError(e);
            }
        }

        private void onRoleChanged(DeviceId deviceId, MastershipRole newRole) {
            log.trace("roleChanged({}, {})", deviceId, newRole);
            DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
            msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
                                          .setDeviceId(deviceId.toString())
                                          .setNewRole(translate(newRole))
                                          .build());
            toDeviceProvider.onNext(msgBuilder.build());
        }

        @Override
        public boolean isReachable(DeviceId deviceId) {
            try {
                return onIsReachable(deviceId);
            } catch (Exception e) {
                log.error("Exception caught handling onIsReachable({})",
                          deviceId, e);
                toDeviceProvider.onError(e);
                return false;
            }
        }

        private boolean onIsReachable(DeviceId deviceId) {

            log.trace("isReachable({})", deviceId);
            CompletableFuture<Boolean> result = new CompletableFuture<>();
            final int xid = xidPool.incrementAndGet();

            DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
            msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
                                                 .setXid(xid)
                                                 .setDeviceId(deviceId.toString())
                                                 .build());

            // Associate xid and register above future some where
            // in DeviceProviderService channel to receive reply
            if (deviceProviderServiceProxy != null) {
                deviceProviderServiceProxy.register(xid, result);
            }

            // send message down RPC
            toDeviceProvider.onNext(msgBuilder.build());

            // wait for reply
            try {
                return result.get(10, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                log.debug("isReachable({}) was Interrupted", deviceId, e);
                Thread.currentThread().interrupt();
            } catch (TimeoutException e) {
                log.warn("isReachable({}) Timed out", deviceId, e);
            } catch (ExecutionException e) {
                log.error("isReachable({}) Execution failed", deviceId, e);
                // close session
                toDeviceProvider.onError(e);
            }
            return false;
        }

        @Override
        public ProviderId id() {
            return checkNotNull(providerId, "not initialized yet");
        }

        @Override
        public void changePortState(DeviceId deviceId, PortNumber portNumber,
                                    boolean enable) {
            // TODO Implement if required
            log.error("changePortState not supported yet");
            toDeviceProvider.onError(new UnsupportedOperationException("not implemented yet"));
        }

    }
}
