/*
 * Copyright 2018-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.onosproject.provider.general.device.impl;

import com.google.common.annotations.Beta;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Striped;
import gnmi.Gnmi.Notification;
import gnmi.Gnmi.Path;
import gnmi.Gnmi.PathElem;
import gnmi.Gnmi.SubscribeRequest;
import gnmi.Gnmi.Subscription;
import gnmi.Gnmi.SubscriptionList;
import gnmi.Gnmi.SubscriptionMode;
import gnmi.Gnmi.Update;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiController;
import org.onosproject.gnmi.api.GnmiEvent;
import org.onosproject.gnmi.api.GnmiEventListener;
import org.onosproject.gnmi.api.GnmiUpdate;
import org.onosproject.gnmi.api.GnmiUtils;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;

/**
 * Entity that manages gNMI subscription for devices using OpenConfig models and
 * that reports relevant events to the core.
 */
@Beta
class GnmiDeviceStateSubscriber {

    private static final String LAST_CHANGE = "last-changed";

    private static Logger log = LoggerFactory.getLogger(GnmiDeviceStateSubscriber.class);

    private final GnmiController gnmiController;
    private final DeviceService deviceService;
    private final DeviceProviderService providerService;
    private final MastershipService mastershipService;

    private final InternalGnmiEventListener gnmiEventListener = new InternalGnmiEventListener();
    private final InternalDeviceListener deviceEventListener = new InternalDeviceListener();
    private final InternalMastershipListener mastershipListener = new InternalMastershipListener();
    private final Map<DeviceId, Set<PortNumber>> deviceSubscribed = Maps.newHashMap();

    private final Striped<Lock> deviceLocks = Striped.lock(30);

    GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService,
                              MastershipService mastershipService,
                              DeviceProviderService providerService) {
        this.gnmiController = gnmiController;
        this.deviceService = deviceService;
        this.mastershipService = mastershipService;
        this.providerService = providerService;
    }

    public void activate() {
        deviceService.addListener(deviceEventListener);
        mastershipService.addListener(mastershipListener);
        gnmiController.addListener(gnmiEventListener);
        // Subscribe to existing devices.
        deviceService.getDevices().forEach(d -> checkSubscription(d.id()));
    }

    public void deactivate() {
        deviceSubscribed.keySet().forEach(this::unsubscribeIfNeeded);
        deviceService.removeListener(deviceEventListener);
        mastershipService.removeListener(mastershipListener);
        gnmiController.removeListener(gnmiEventListener);
    }

    private void checkSubscription(DeviceId deviceId) {
        if (gnmiController.getClient(deviceId) == null) {
            // Ignore devices for which a gNMI client does not exist.
            return;
        }
        deviceLocks.get(deviceId).lock();
        try {
            if (shouldHaveSubscription(deviceId)) {
                subscribeIfNeeded(deviceId);
            } else {
                unsubscribeIfNeeded(deviceId);
            }
        } finally {
            deviceLocks.get(deviceId).unlock();
        }
    }

    private boolean shouldHaveSubscription(DeviceId deviceId) {
        return deviceService.getDevice(deviceId) != null
                && deviceService.isAvailable(deviceId)
                && mastershipService.isLocalMaster(deviceId)
                && !deviceService.getPorts(deviceId).isEmpty();
    }

    private Path interfaceOperStatusPath(String interfaceName) {
        return Path.newBuilder()
                .addElem(PathElem.newBuilder().setName("interfaces").build())
                .addElem(PathElem.newBuilder()
                                 .setName("interface").putKey("name", interfaceName).build())
                .addElem(PathElem.newBuilder().setName("state").build())
                .addElem(PathElem.newBuilder().setName("oper-status").build())
                .build();
    }

    private void unsubscribeIfNeeded(DeviceId deviceId) {
        gnmiController.getClient(deviceId).unsubscribe();
        if (deviceSubscribed.remove(deviceId) != null) {
            log.info("Cancelled gNMI subscription for {}", deviceId);
        }
    }

    private void subscribeIfNeeded(DeviceId deviceId) {

        Set<PortNumber> ports = deviceService.getPorts(deviceId).stream()
                .map(Port::number)
                .collect(Collectors.toSet());

        if (Objects.equals(ports, deviceSubscribed.get(deviceId))) {
            // Already subscribed for the same ports.
            return;
        }

        GnmiClient client = gnmiController.getClient(deviceId);

        SubscriptionList.Builder subscriptionList = SubscriptionList.newBuilder();
        subscriptionList.setMode(SubscriptionList.Mode.STREAM);
        subscriptionList.setUpdatesOnly(true);

        ports.forEach(port -> {
            String portName = port.name();
            // Subscribe /interface/interface[name=port-name]/state/oper-status
            Path subscribePath = interfaceOperStatusPath(portName);
            Subscription interfaceOperStatusSub =
                    Subscription.newBuilder()
                            .setPath(subscribePath)
                            .setMode(SubscriptionMode.ON_CHANGE)
                            .build();
            // TODO: more state subscription
            subscriptionList.addSubscription(interfaceOperStatusSub);
        });

        SubscribeRequest subscribeRequest = SubscribeRequest.newBuilder()
                .setSubscribe(subscriptionList.build())
                .build();

        client.subscribe(subscribeRequest);

        log.info("Started gNMI subscription for {} ports on {}", ports.size(), deviceId);

        deviceSubscribed.put(deviceId, ports);
    }

    private void handleGnmiUpdate(GnmiUpdate eventSubject) {
        Notification notification = eventSubject.update();
        if (notification == null) {
            log.warn("Cannot handle gNMI event without update data, abort");
            log.debug("gNMI update:\n{}", eventSubject);
            return;
        }

        List<Update> updateList = notification.getUpdateList();
        updateList.forEach(update -> {
            Path path = update.getPath();
            PathElem lastElem = path.getElem(path.getElemCount() - 1);

            // Use last element to identify which state updated
            if ("oper-status".equals(lastElem.getName())) {
                handleOperStatusUpdate(eventSubject.deviceId(), update,
                                       notification.getTimestamp());
            } else {
                log.debug("Unrecognized update {}", GnmiUtils.pathToString(path));
            }
        });
    }

    private void handleOperStatusUpdate(DeviceId deviceId, Update update, long timestamp) {
        Path path = update.getPath();
        // first element should be "interface"
        String interfaceName = path.getElem(1).getKeyOrDefault("name", null);
        if (interfaceName == null) {
            log.error("No interface present in gNMI update, abort");
            log.debug("gNMI update:\n{}", update);
            return;
        }

        List<Port> portsFromDevice = deviceService.getPorts(deviceId);
        portsFromDevice.forEach(port -> {
            if (!port.number().name().equals(interfaceName)) {
                return;
            }

            DefaultAnnotations portAnnotations = DefaultAnnotations.builder()
                    .putAll(port.annotations())
                    .set(LAST_CHANGE, String.valueOf(timestamp))
                    .build();

            // Port/Interface name is identical in OpenConfig model, but not in ONOS
            // This might cause some problem if we use one name to different port
            PortDescription portDescription = DefaultPortDescription.builder()
                    .portSpeed(port.portSpeed())
                    .withPortNumber(port.number())
                    .isEnabled(update.getVal().getStringVal().equals("UP"))
                    .type(port.type())
                    .annotations(portAnnotations)
                    .build();
            providerService.portStatusChanged(deviceId, portDescription);
        });
    }

    class InternalGnmiEventListener implements GnmiEventListener {

        @Override
        public void event(GnmiEvent event) {
            if (!deviceSubscribed.containsKey(event.subject().deviceId())) {
                log.warn("Received gNMI event from {}, but we did'nt expect to " +
                                 "be subscribed to it! Discarding event...",
                         event.subject().deviceId());
                return;
            }

            log.debug("Received gNMI event {}", event.toString());
            if (event.type() == GnmiEvent.Type.UPDATE) {
                handleGnmiUpdate((GnmiUpdate) event.subject());
            } else {
                log.debug("Unsupported gNMI event type: {}", event.type());
            }
        }
    }

    class InternalMastershipListener implements MastershipListener {

        @Override
        public void event(MastershipEvent event) {
            checkSubscription(event.subject());
        }
    }

    class InternalDeviceListener implements DeviceListener {

        @Override
        public void event(DeviceEvent event) {
            switch (event.type()) {
                case DEVICE_ADDED:
                case DEVICE_AVAILABILITY_CHANGED:
                case DEVICE_UPDATED:
                case DEVICE_REMOVED:
                case PORT_ADDED:
                case PORT_REMOVED:
                    checkSubscription(event.subject().id());
                    break;
                default:
                    break;
            }
        }
    }
}
