/*
 * Copyright 2016-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.cpman.impl;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cpman.ControlLoad;
import org.onosproject.cpman.ControlLoadSnapshot;
import org.onosproject.cpman.ControlMetric;
import org.onosproject.cpman.ControlMetricType;
import org.onosproject.cpman.ControlMetricsRequest;
import org.onosproject.cpman.ControlPlaneMonitorService;
import org.onosproject.cpman.ControlResource;
import org.onosproject.cpman.ControlResourceRequest;
import org.onosproject.cpman.MetricsDatabase;
import org.onosproject.net.DeviceId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
import static org.onosproject.cpman.ControlResource.CPU_METRICS;
import static org.onosproject.cpman.ControlResource.DISK_METRICS;
import static org.onosproject.cpman.ControlResource.MEMORY_METRICS;
import static org.onosproject.cpman.ControlResource.NETWORK_METRICS;
import static org.onosproject.cpman.ControlResource.Type;

/**
 * Control plane monitoring service class.
 */
@Component(immediate = true)
@Service
public class ControlPlaneMonitor implements ControlPlaneMonitorService {

    private final Logger log = LoggerFactory.getLogger(getClass());
    private MetricsDatabase cpuMetrics;
    private MetricsDatabase memoryMetrics;
    private Map<DeviceId, MetricsDatabase> controlMessageMap;
    private Map<String, MetricsDatabase> diskMetricsMap;
    private Map<String, MetricsDatabase> networkMetricsMap;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService communicationService;

    private static final String DEFAULT_RESOURCE = "default";

    private static final Set RESOURCE_TYPE_SET =
            ImmutableSet.of(Type.CONTROL_MESSAGE, Type.DISK, Type.NETWORK);

    private static final MessageSubject CONTROL_STATS =
            new MessageSubject("control-plane-stats");

    private static final MessageSubject CONTROL_RESOURCE =
            new MessageSubject("control-plane-resources");

    private Map<ControlMetricType, Double> cpuBuf;
    private Map<ControlMetricType, Double> memoryBuf;
    private Map<String, Map<ControlMetricType, Double>> diskBuf;
    private Map<String, Map<ControlMetricType, Double>> networkBuf;
    private Map<DeviceId, Map<ControlMetricType, Double>> ctrlMsgBuf;

    private Map<Type, Set<String>> availableResourceMap;
    private Set<DeviceId> availableDeviceIdSet;

    private static final String METRIC_TYPE_NULL = "Control metric type cannot be null";
    private static final String RESOURCE_TYPE_NULL = "Control resource type cannot be null";

    private static final Serializer SERIALIZER = Serializer
            .using(new KryoNamespace.Builder()
                    .register(KryoNamespaces.API)
                    .register(ControlMetricsRequest.class)
                    .register(ControlResourceRequest.class)
                    .register(ControlLoadSnapshot.class)
                    .register(ControlMetricType.class)
                    .register(ControlResource.Type.class)
                    .register(TimeUnit.class)
                    .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());

    @Activate
    public void activate() {
        cpuMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.CPU, CPU_METRICS);
        memoryMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.MEMORY, MEMORY_METRICS);
        controlMessageMap = Maps.newConcurrentMap();
        diskMetricsMap = Maps.newConcurrentMap();
        networkMetricsMap = Maps.newConcurrentMap();

        cpuBuf = Maps.newConcurrentMap();
        memoryBuf = Maps.newConcurrentMap();
        diskBuf = Maps.newConcurrentMap();
        networkBuf = Maps.newConcurrentMap();
        ctrlMsgBuf = Maps.newConcurrentMap();

        availableResourceMap = Maps.newConcurrentMap();
        availableDeviceIdSet = Sets.newConcurrentHashSet();

        communicationService.<ControlMetricsRequest, ControlLoadSnapshot>addSubscriber(CONTROL_STATS,
                SERIALIZER::decode, this::handleMetricsRequest, SERIALIZER::encode);

        communicationService.<ControlResourceRequest, Set<String>>addSubscriber(CONTROL_RESOURCE,
                SERIALIZER::decode, this::handleResourceRequest, SERIALIZER::encode);

        log.info("Started");
    }

    @Deactivate
    public void deactivate() {

        // TODO: need to handle the mdb close.
        cpuBuf.clear();
        memoryBuf.clear();
        diskBuf.clear();
        networkBuf.clear();
        ctrlMsgBuf.clear();

        communicationService.removeSubscriber(CONTROL_STATS);
        communicationService.removeSubscriber(CONTROL_RESOURCE);

        log.info("Stopped");
    }

    @Override
    public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
                             Optional<DeviceId> deviceId) {
        if (deviceId.isPresent()) {

            // insert a new device entry if we cannot find any
            ctrlMsgBuf.putIfAbsent(deviceId.get(), Maps.newConcurrentMap());

            // update control message metrics
            if (CONTROL_MESSAGE_METRICS.contains(cm.metricType())) {

                if (!availableDeviceIdSet.contains(deviceId.get())) {
                    availableDeviceIdSet.add(deviceId.get());
                }

                // we will accumulate the metric value into buffer first
                ctrlMsgBuf.get(deviceId.get()).putIfAbsent(cm.metricType(),
                        (double) cm.metricValue().getLoad());

                // if buffer contains all control message metrics,
                // we simply set and update the values into MetricsDatabase.
                if (ctrlMsgBuf.get(deviceId.get()).keySet()
                        .containsAll(CONTROL_MESSAGE_METRICS)) {
                    updateControlMessages(ctrlMsgBuf.get(deviceId.get()), deviceId.get());
                    ctrlMsgBuf.clear();
                }
            }
        } else {

            // update cpu metrics
            if (CPU_METRICS.contains(cm.metricType())) {
                cpuBuf.putIfAbsent(cm.metricType(),
                        (double) cm.metricValue().getLoad());
                if (cpuBuf.keySet().containsAll(CPU_METRICS)) {
                    cpuMetrics.updateMetrics(convertMap(cpuBuf));
                    cpuBuf.clear();
                }
            }

            // update memory metrics
            if (MEMORY_METRICS.contains(cm.metricType())) {
                memoryBuf.putIfAbsent(cm.metricType(),
                        (double) cm.metricValue().getLoad());
                if (memoryBuf.keySet().containsAll(MEMORY_METRICS)) {
                    memoryMetrics.updateMetrics(convertMap(memoryBuf));
                    memoryBuf.clear();
                }
            }
        }
    }

    @Override
    public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
                             String resourceName) {
        // update disk metrics
        if (DISK_METRICS.contains(cm.metricType())) {
            diskBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());

            availableResourceMap.putIfAbsent(Type.DISK, Sets.newHashSet());
            availableResourceMap.computeIfPresent(Type.DISK, (k, v) -> {
                v.add(resourceName);
                return v;
            });

            diskBuf.get(resourceName).putIfAbsent(cm.metricType(),
                    (double) cm.metricValue().getLoad());
            if (diskBuf.get(resourceName).keySet().containsAll(DISK_METRICS)) {
                updateDiskMetrics(diskBuf.get(resourceName), resourceName);
                diskBuf.clear();
            }
        }

        // update network metrics
        if (NETWORK_METRICS.contains(cm.metricType())) {
            networkBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());

            availableResourceMap.putIfAbsent(Type.NETWORK, Sets.newHashSet());
            availableResourceMap.computeIfPresent(Type.NETWORK, (k, v) -> {
                v.add(resourceName);
                return v;
            });

            networkBuf.get(resourceName).putIfAbsent(cm.metricType(),
                    (double) cm.metricValue().getLoad());
            if (networkBuf.get(resourceName).keySet().containsAll(NETWORK_METRICS)) {
                updateNetworkMetrics(networkBuf.get(resourceName), resourceName);
                networkBuf.clear();
            }
        }
    }

    @Override
    public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
                                                          ControlMetricType type,
                                                          Optional<DeviceId> deviceId) {
        if (clusterService.getLocalNode().id().equals(nodeId)) {
            return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId)));
        } else {
            return communicationService.sendAndReceive(createMetricsRequest(type, deviceId),
                    CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
        }
    }

    @Override
    public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
                                                          ControlMetricType type,
                                                          String resourceName) {
        if (clusterService.getLocalNode().id().equals(nodeId)) {
            return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName)));
        } else {
            return communicationService.sendAndReceive(createMetricsRequest(type, resourceName),
                    CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
        }
    }

    @Override
    public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
                                                          ControlMetricType type,
                                                          int duration, TimeUnit unit,
                                                          Optional<DeviceId> deviceId) {
        if (clusterService.getLocalNode().id().equals(nodeId)) {
            return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId), duration, unit));
        } else {
            return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, deviceId),
                    CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
        }
    }

    @Override
    public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
                                                          ControlMetricType type,
                                                          int duration, TimeUnit unit,
                                                          String resourceName) {
        if (clusterService.getLocalNode().id().equals(nodeId)) {
            return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName), duration, unit));
        } else {
            return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, resourceName),
                    CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
        }
    }

    @Override
    public CompletableFuture<Set<String>> availableResources(NodeId nodeId,
                                                             Type resourceType) {
        if (clusterService.getLocalNode().id().equals(nodeId)) {
            Set<String> resources = getLocalAvailableResources(resourceType);
            return CompletableFuture.completedFuture(resources);
        } else {
            return communicationService.sendAndReceive(createResourceRequest(resourceType),
                    CONTROL_RESOURCE, SERIALIZER::encode, SERIALIZER::decode, nodeId);
        }
    }

    /**
     * Builds and returns metric database instance with given resource name,
     * resource type and metric type.
     *
     * @param resourceName resource name
     * @param resourceType resource type
     * @param metricTypes  metric type
     * @return metric database instance
     */
    private MetricsDatabase genMDbBuilder(String resourceName,
                                          Type resourceType,
                                          Set<ControlMetricType> metricTypes) {
        MetricsDatabase.Builder builder = new DefaultMetricsDatabase.Builder();
        builder.withMetricName(resourceType.toString());
        builder.withResourceName(resourceName);
        metricTypes.forEach(type -> builder.addMetricType(type.toString()));
        return builder.build();
    }

    /**
     * Updates network metrics with given metric map and resource name.
     *
     * @param metricMap    a metric map which is comprised of metric type and value
     * @param resourceName resource name
     */
    private void updateNetworkMetrics(Map<ControlMetricType, Double> metricMap,
                                      String resourceName) {
        if (!networkMetricsMap.containsKey(resourceName)) {
            networkMetricsMap.put(resourceName, genMDbBuilder(resourceName,
                    Type.NETWORK, NETWORK_METRICS));
        }
        networkMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap));
    }

    /**
     * Updates disk metrics with given metric map and resource name.
     *
     * @param metricMap    a metric map which is comprised of metric type and value
     * @param resourceName resource name
     */
    private void updateDiskMetrics(Map<ControlMetricType, Double> metricMap,
                                   String resourceName) {
        if (!diskMetricsMap.containsKey(resourceName)) {
            diskMetricsMap.put(resourceName, genMDbBuilder(resourceName,
                    Type.DISK, DISK_METRICS));
        }
        diskMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap));
    }

    /**
     * Updates control message metrics with given metric map and device identifier.
     *
     * @param metricMap a metric map which is comprised of metric type and value
     * @param deviceId  device identifier
     */
    private void updateControlMessages(Map<ControlMetricType, Double> metricMap,
                                       DeviceId deviceId) {
        if (!controlMessageMap.containsKey(deviceId)) {
            controlMessageMap.put(deviceId, genMDbBuilder(deviceId.toString(),
                    Type.CONTROL_MESSAGE, CONTROL_MESSAGE_METRICS));
        }
        controlMessageMap.get(deviceId).updateMetrics(convertMap(metricMap));
    }

    /**
     * Converts metric map into a new map which contains string formatted metric type as key.
     *
     * @param metricMap metric map in which ControlMetricType is key
     * @return a new map in which string formatted metric type is key
     */
    private Map<String, Double> convertMap(Map<ControlMetricType, Double> metricMap) {
        if (metricMap == null) {
            return ImmutableMap.of();
        }
        Map newMap = Maps.newConcurrentMap();
        metricMap.forEach((k, v) -> newMap.putIfAbsent(k.toString(), v));
        return newMap;
    }

    /**
     * Handles control metric request from remote node.
     *
     * @param request control metric request
     * @return completable future object of control load snapshot
     */
    private CompletableFuture<ControlLoadSnapshot>
        handleMetricsRequest(ControlMetricsRequest request) {

        checkArgument(request.getType() != null, METRIC_TYPE_NULL);

        ControlLoad load;
        if (request.getResourceName() != null && request.getUnit() != null) {
            load = getLocalLoad(request.getType(), request.getResourceName());
        } else {
            load = getLocalLoad(request.getType(), request.getDeviceId());
        }

        long average;
        if (request.getUnit() != null) {
            average = load.average(request.getDuration(), request.getUnit());
        } else {
            average = load.average();
        }
        ControlLoadSnapshot resp =
                new ControlLoadSnapshot(load.latest(), average, load.time());
        return CompletableFuture.completedFuture(resp);
    }

    /**
     * Handles control resource request from remote node.
     *
     * @param request control resource type
     * @return completable future object of control resource set
     */
    private CompletableFuture<Set<String>>
        handleResourceRequest(ControlResourceRequest request) {

        checkArgument(request.getType() != null, RESOURCE_TYPE_NULL);

        Set<String> resources = getLocalAvailableResources(request.getType());
        return CompletableFuture.completedFuture(resources);
    }

    /**
     * Generates a control metric request.
     *
     * @param type     control metric type
     * @param deviceId device identifier
     * @return control metric request instance
     */
    private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
                                                       Optional<DeviceId> deviceId) {
        return new ControlMetricsRequest(type, deviceId);
    }

    /**
     * Generates a control metric request with given projected time range.
     *
     * @param type     control metric type
     * @param duration projected time duration
     * @param unit     projected time unit
     * @param deviceId device identifier
     * @return control metric request instance
     */
    private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
                                                       int duration, TimeUnit unit,
                                                       Optional<DeviceId> deviceId) {
        return new ControlMetricsRequest(type, duration, unit, deviceId);
    }

    /**
     * Generates a control metric request.
     *
     * @param type         control metric type
     * @param resourceName resource name
     * @return control metric request instance
     */
    private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
                                                       String resourceName) {
        return new ControlMetricsRequest(type, resourceName);
    }

    /**
     * Generates a control metric request with given projected time range.
     *
     * @param type         control metric type
     * @param duration     projected time duration
     * @param unit         projected time unit
     * @param resourceName resource name
     * @return control metric request instance
     */
    private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
                                                       int duration, TimeUnit unit,
                                                       String resourceName) {
        return new ControlMetricsRequest(type, duration, unit, resourceName);
    }

    /**
     * Generates a control resource request with given resource type.
     *
     * @param type control resource type
     * @return control resource request instance
     */
    private ControlResourceRequest createResourceRequest(ControlResource.Type type) {
        return new ControlResourceRequest(type);
    }

    /**
     * Returns a snapshot of control load.
     *
     * @param cl control load
     * @return a snapshot of control load
     */
    private ControlLoadSnapshot snapshot(ControlLoad cl) {
        if (cl != null) {
            return new ControlLoadSnapshot(cl.latest(), cl.average(), cl.time());
        }
        return null;
    }

    /**
     * Returns a snapshot of control load with given projected time range.
     *
     * @param cl       control load
     * @param duration projected time duration
     * @param unit     projected time unit
     * @return a snapshot of control load
     */
    private ControlLoadSnapshot snapshot(ControlLoad cl, int duration, TimeUnit unit) {
        if (cl != null) {

            return new ControlLoadSnapshot(cl.latest(), cl.average(duration, unit),
                    cl.time(), cl.recent(duration, unit));
        }
        return null;
    }

    /**
     * Returns local control load.
     *
     * @param type     metric type
     * @param deviceId device identifier
     * @return control load
     */
    private ControlLoad getLocalLoad(ControlMetricType type,
                                     Optional<DeviceId> deviceId) {
        if (deviceId.isPresent()) {
            // returns control message stats
            if (CONTROL_MESSAGE_METRICS.contains(type) &&
                    availableDeviceIdSet.contains(deviceId.get())) {
                return new DefaultControlLoad(controlMessageMap.get(deviceId.get()), type);
            }
        } else {
            // returns controlLoad of CPU metrics
            if (CPU_METRICS.contains(type)) {
                return new DefaultControlLoad(cpuMetrics, type);
            }

            // returns memoryLoad of memory metrics
            if (MEMORY_METRICS.contains(type)) {
                return new DefaultControlLoad(memoryMetrics, type);
            }
        }
        return null;
    }

    /**
     * Returns local control load.
     *
     * @param type         metric type
     * @param resourceName resource name
     * @return control load
     */
    private ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
        NodeId localNodeId = clusterService.getLocalNode().id();

        // returns disk I/O stats
        if (DISK_METRICS.contains(type) &&
                availableResourcesSync(localNodeId, Type.DISK).contains(resourceName)) {
            return new DefaultControlLoad(diskMetricsMap.get(resourceName), type);
        }

        // returns network I/O stats
        if (NETWORK_METRICS.contains(type) &&
                availableResourcesSync(localNodeId, Type.NETWORK).contains(resourceName)) {
            return new DefaultControlLoad(networkMetricsMap.get(resourceName), type);
        }
        return null;
    }

    /**
     * Obtains the available resource list from local node.
     *
     * @param resourceType control resource type
     * @return a set of available control resource
     */
    private Set<String> getLocalAvailableResources(Type resourceType) {
        Set<String> resources = ImmutableSet.of();
        if (RESOURCE_TYPE_SET.contains(resourceType)) {
            if (Type.CONTROL_MESSAGE.equals(resourceType)) {
                resources = ImmutableSet.copyOf(availableDeviceIdSet.stream()
                        .map(DeviceId::toString).collect(Collectors.toSet()));
            } else {
                Set<String> res = availableResourceMap.get(resourceType);
                resources = res == null ? ImmutableSet.of() : res;
            }
        }
        return resources;
    }
}
