/*
 * Copyright 2018-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.onosproject.gnmi.ctl;


import gnmi.Gnmi;
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiEvent;
import org.onosproject.gnmi.api.GnmiUpdate;
import org.onosproject.net.DeviceId;
import org.slf4j.Logger;

import java.net.ConnectException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static java.lang.String.format;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;

/**
 * A manager for the gNMI stream channel that opportunistically creates
 * new stream RCP stubs (e.g. when one fails because of errors) and posts
 * subscribe events via the gNMI controller.
 */
final class GnmiSubscriptionManager {

    /**
     * The state of the subscription manager.
     */
    enum State {

        /**
         * Subscription not exists.
         */
        INIT,

        /**
         * Exists a subscription and channel opened.
         */
        SUBSCRIBED,

        /**
         * Exists a subscription, but the channel does not open.
         */
        RETRYING,
    }

    // FIXME: make this configurable
    private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds
    private static final Logger log = getLogger(GnmiSubscriptionManager.class);
    private final ManagedChannel channel;
    private final DeviceId deviceId;
    private final GnmiControllerImpl controller;

    private final StreamObserver<Gnmi.SubscribeResponse> responseObserver;
    private final AtomicReference<State> state = new AtomicReference<>(State.INIT);

    private ClientCallStreamObserver<Gnmi.SubscribeRequest> requestObserver;
    private Gnmi.SubscribeRequest existingSubscription;
    private final ScheduledExecutorService streamCheckerExecutor =
            newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-probe", "%d", log));

    GnmiSubscriptionManager(ManagedChannel channel, DeviceId deviceId,
                            GnmiControllerImpl controller) {
        this.channel = channel;
        this.deviceId = deviceId;
        this.controller = controller;
        this.responseObserver = new InternalStreamResponseObserver();
        streamCheckerExecutor.scheduleAtFixedRate(this::checkGnmiStream, 0,
                                                  DEFAULT_RECONNECT_DELAY,
                                                  TimeUnit.SECONDS);
    }

    public void shutdown() {
        log.info("gNMI subscription manager for device {} shutdown", deviceId);
        streamCheckerExecutor.shutdown();
        complete();
    }

    private void initIfRequired() {
        if (requestObserver == null) {
            log.debug("Creating new stream channel for {}...", deviceId);
            requestObserver = (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
                    gNMIGrpc.newStub(channel).subscribe(responseObserver);

        }
    }

    boolean subscribe(Gnmi.SubscribeRequest request) {
        synchronized (state) {
            if (state.get() == State.SUBSCRIBED) {
                // Cancel subscription when we need to subscribe new thing
                complete();
            }

            existingSubscription = request;
            return send(request);
        }
    }

    private boolean send(Gnmi.SubscribeRequest value) {
        initIfRequired();
        try {
            requestObserver.onNext(value);
            state.set(State.SUBSCRIBED);
            return true;
        } catch (Throwable ex) {
            if (ex instanceof StatusRuntimeException) {
                log.warn("Unable to send subscribe request to {}: {}",
                        deviceId, ex.getMessage());
            } else {
                log.warn("Exception while sending subscribe request to {}",
                        deviceId, ex);
            }
            state.set(State.RETRYING);
            return false;
        }
    }

    public void complete() {
        synchronized (state) {
            state.set(State.INIT);
            if (requestObserver != null) {
                requestObserver.onCompleted();
                requestObserver.cancel("Terminated", null);
                requestObserver = null;
            }
        }
    }

    private void checkGnmiStream() {
        synchronized (state) {
            if (state.get() != State.RETRYING) {
                // No need to retry if the state is not RETRYING
                return;
            }
            log.info("Try reconnecting gNMI stream to device {}", deviceId);

            complete();
            send(existingSubscription);
        }
    }

    /**
     * Handles messages received from the device on the stream channel.
     */
    private final class InternalStreamResponseObserver
            implements StreamObserver<Gnmi.SubscribeResponse> {

        @Override
        public void onNext(Gnmi.SubscribeResponse message) {
            try {
                log.debug("Received message on stream channel from {}: {}",
                        deviceId, message.toString());
                GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
                GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
                controller.postEvent(event);
            } catch (Throwable ex) {
                log.error("Exception while processing stream message from {}",
                        deviceId, ex);
            }
        }

        @Override
        public void onError(Throwable throwable) {
            if (throwable instanceof StatusRuntimeException) {
                StatusRuntimeException sre = (StatusRuntimeException) throwable;
                if (sre.getStatus().getCause() instanceof ConnectException) {
                    log.warn("Device {} is unreachable ({})",
                            deviceId, sre.getCause().getMessage());
                } else {
                    log.warn("Received error on stream channel for {}: {}",
                            deviceId, throwable.getMessage());
                }
            } else {
                log.warn(format("Received exception on stream channel for %s",
                        deviceId), throwable);
            }
            state.set(State.RETRYING);
        }

        @Override
        public void onCompleted() {
            log.warn("Stream channel for {} has completed", deviceId);
            state.set(State.RETRYING);
        }
    }
}


