/*
 * Copyright 2015-present Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.intentperf;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.MacAddress;
import org.onlab.util.Counter;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentListener;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.intent.PointToPointIntent;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.*;
import static org.onosproject.net.intent.IntentEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;

/**
 * Application to test sustained intent throughput.
 */
@Component(immediate = true)
@Service(value = IntentPerfInstaller.class)
public class IntentPerfInstaller {

    private final Logger log = getLogger(getClass());

    private static final int DEFAULT_NUM_WORKERS = 1;

    private static final int DEFAULT_NUM_KEYS = 40000;
    private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms

    private static final int DEFAULT_NUM_NEIGHBORS = 0;

    private static final int START_DELAY = 5_000; // ms
    private static final int REPORT_PERIOD = 1_000; //ms

    private static final String START = "start";
    private static final String STOP = "stop";
    private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");

    //FIXME add path length

    @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
            label = "Number of keys (i.e. unique intents) to generate per instance")
    private int numKeys = DEFAULT_NUM_KEYS;

    //TODO implement numWorkers property
//    @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS,
//              label = "Number of installer threads per instance")
//    private int numWokers = DEFAULT_NUM_WORKERS;

    @Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD,
            label = "Goal for cycle period (in ms)")
    private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;

    @Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS,
            label = "Number of neighbors to generate intents for")
    private int numNeighbors = DEFAULT_NUM_NEIGHBORS;

    @Reference(cardinality = MANDATORY_UNARY)
    protected CoreService coreService;

    @Reference(cardinality = MANDATORY_UNARY)
    protected IntentService intentService;

    @Reference(cardinality = MANDATORY_UNARY)
    protected ClusterService clusterService;

    @Reference(cardinality = MANDATORY_UNARY)
    protected DeviceService deviceService;

    @Reference(cardinality = MANDATORY_UNARY)
    protected MastershipService mastershipService;

    @Reference(cardinality = MANDATORY_UNARY)
    protected WorkPartitionService partitionService;

    @Reference(cardinality = MANDATORY_UNARY)
    protected ComponentConfigService configService;

    @Reference(cardinality = MANDATORY_UNARY)
    protected IntentPerfCollector sampleCollector;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService communicationService;

    private ExecutorService messageHandlingExecutor;

    private ExecutorService workers;
    private ApplicationId appId;
    private Listener listener;
    private boolean stopped = true;

    private Timer reportTimer;

    // FIXME this variable isn't shared properly between multiple worker threads
    private int lastKey = 0;

    private IntentPerfUi perfUi;
    private NodeId nodeId;
    private TimerTask reporterTask;

    @Activate
    public void activate(ComponentContext context) {
        configService.registerProperties(getClass());

        nodeId = clusterService.getLocalNode().id();
        appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());

        // TODO: replace with shared timer
        reportTimer = new Timer("onos-intent-perf-reporter");
        workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));

        // TODO: replace with shared executor
        messageHandlingExecutor = Executors.newSingleThreadExecutor(
                groupedThreads("onos/perf", "command-handler"));

        communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
                                           messageHandlingExecutor);

        listener = new Listener();
        intentService.addListener(listener);

        // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
        modify(context);
    }

    @Deactivate
    public void deactivate() {
        stopTestRun();

        configService.unregisterProperties(getClass(), false);
        messageHandlingExecutor.shutdown();
        communicationService.removeSubscriber(CONTROL);

        if (listener != null) {
            reportTimer.cancel();
            intentService.removeListener(listener);
            listener = null;
            reportTimer = null;
        }
    }

    @Modified
    public void modify(ComponentContext context) {
        if (context == null) {
            logConfig("Reconfigured");
            return;
        }

        Dictionary<?, ?> properties = context.getProperties();
        int newNumKeys, newCyclePeriod, newNumNeighbors;
        try {
            String s = get(properties, "numKeys");
            newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());

            s = get(properties, "cyclePeriod");
            newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());

            s = get(properties, "numNeighbors");
            newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());

        } catch (NumberFormatException | ClassCastException e) {
            log.warn("Malformed configuration detected; using defaults", e);
            newNumKeys = DEFAULT_NUM_KEYS;
            newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
            newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
        }

        if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
            numKeys = newNumKeys;
            cyclePeriod = newCyclePeriod;
            numNeighbors = newNumNeighbors;
            logConfig("Reconfigured");
        }
    }

    public void start() {
        if (stopped) {
            stopped = false;
            communicationService.broadcast(START, CONTROL, str -> str.getBytes());
            startTestRun();
        }
    }

    public void stop() {
        if (!stopped) {
            communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
            stopTestRun();
        }
    }

    private void logConfig(String prefix) {
        log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
                 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
    }

    private void startTestRun() {
        sampleCollector.clearSamples();

        // adjust numNeighbors and generate list of neighbors
        numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);

        // Schedule reporter task on report period boundary
        reporterTask = new ReporterTask();
        reportTimer.scheduleAtFixedRate(reporterTask,
                                        REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
                                        REPORT_PERIOD);

        // Submit workers
        stopped = false;
        for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
            workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
        }
        log.info("Started test run");
    }

    private void stopTestRun() {
        if (reporterTask != null) {
            reporterTask.cancel();
            reporterTask = null;
        }

        try {
            workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.warn("Failed to stop worker", e);
        }

        sampleCollector.recordSample(0, 0);
        sampleCollector.recordSample(0, 0);
        stopped = true;

        log.info("Stopped test run");
    }

    private List<NodeId> getNeighbors() {
        List<NodeId> nodes = clusterService.getNodes().stream()
                .map(ControllerNode::id)
                .collect(Collectors.toCollection(ArrayList::new));
        // sort neighbors by id
        Collections.sort(nodes, (node1, node2) ->
                node1.toString().compareTo(node2.toString()));
        // rotate the local node to index 0
        Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
        log.debug("neighbors (raw): {}", nodes); //TODO remove
        // generate the sub-list that will contain local node and selected neighbors
        nodes = nodes.subList(0, numNeighbors + 1);
        log.debug("neighbors: {}", nodes); //TODO remove
        return nodes;
    }

    private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
        // choose a random device for which this node is master
        List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
        Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));

        //FIXME we currently ignore the path length and always use the same device
        TrafficSelector selector = DefaultTrafficSelector.builder()
                .matchEthDst(MacAddress.valueOf(mac)).build();
        TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
        ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
        ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));

        return PointToPointIntent.builder()
                .appId(appId)
                .key(key)
                .selector(selector)
                .treatment(treatment)
                .ingressPoint(ingress)
                .egressPoint(egress)
                .build();
    }

    /**
     * Creates a specified number of intents for testing purposes.
     *
     * @param numberOfKeys number of intents
     * @param pathLength   path depth
     * @param firstKey     first key to attempt
     * @return set of intents
     */
    private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
        List<NodeId> neighbors = getNeighbors();

        Multimap<NodeId, Device> devices = ArrayListMultimap.create();
        deviceService.getAvailableDevices()
                .forEach(device -> devices.put(mastershipService.getMasterFor(device.id()), device));

        // ensure that we have at least one device per neighbor
        neighbors.forEach(node -> checkState(!devices.get(node).isEmpty(),
                                             "There are no devices for {}", node));

        // TODO pull this outside so that createIntent can use it
        // prefix based on node id for keys generated on this instance
        long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;

        int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
        Multimap<NodeId, Intent> intents = ArrayListMultimap.create();

        for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
            Key key = Key.of(keyPrefix + k, appId);

            NodeId leader = partitionService.getLeader(key, Key::hash);
            if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
                // Bail if we are not sending to this node or we have enough for this node
                continue;
            }
            intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));

            // Bump up the counter and remember this as the last key used.
            count++;
            lastKey = k;
            if (count % 1000 == 0) {
                log.info("Building intents... {} (attempt: {})", count, lastKey);
            }
        }
        checkState(intents.values().size() == numberOfKeys,
                   "Generated wrong number of intents");
        log.info("Created {} intents", numberOfKeys);
        intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));

        return Sets.newHashSet(intents.values());
    }

    // Submits intent operations.
    final class Submitter implements Runnable {

        private long lastDuration;
        private int lastCount;

        private Set<Intent> intents = Sets.newHashSet();
        private Set<Intent> submitted = Sets.newHashSet();
        private Set<Intent> withdrawn = Sets.newHashSet();

        private Submitter(Set<Intent> intents) {
            this.intents = intents;
            lastCount = numKeys / 4;
            lastDuration = 1_000; // 1 second
        }

        @Override
        public void run() {
            prime();
            while (!stopped) {
                try {
                    cycle();
                } catch (Exception e) {
                    log.warn("Exception during cycle", e);
                }
            }
            clear();
        }

        private Iterable<Intent> subset(Set<Intent> intents) {
            List<Intent> subset = Lists.newArrayList(intents);
            Collections.shuffle(subset);
            return subset.subList(0, Math.min(intents.size(), lastCount));
        }

        // Submits the specified intent.
        private void submit(Intent intent) {
            intentService.submit(intent);
            submitted.add(intent);
            withdrawn.remove(intent); //TODO could check result here...
        }

        // Withdraws the specified intent.
        private void withdraw(Intent intent) {
            intentService.withdraw(intent);
            withdrawn.add(intent);
            submitted.remove(intent); //TODO could check result here...
        }

        // Primes the cycle.
        private void prime() {
            int i = 0;
            withdrawn.addAll(intents);
            for (Intent intent : intents) {
                submit(intent);
                // only submit half of the intents to start
                if (i++ >= intents.size() / 2) {
                    break;
                }
            }
        }

        private void clear() {
            submitted.forEach(this::withdraw);
        }

        // Runs a single operation cycle.
        private void cycle() {
            //TODO consider running without rate adjustment
            adjustRates();

            long start = currentTimeMillis();
            subset(submitted).forEach(this::withdraw);
            subset(withdrawn).forEach(this::submit);
            long delta = currentTimeMillis() - start;

            if (delta > cyclePeriod * 3 || delta < 0) {
                log.warn("Cycle took {} ms", delta);
            }

            int difference = cyclePeriod - (int) delta;
            if (difference > 0) {
                delay(difference);
            }

            lastDuration = delta;
        }

        int cycleCount = 0;

        private void adjustRates() {

            int addDelta = Math.max(1000 - cycleCount, 10);
            double multRatio = Math.min(0.8 + cycleCount * 0.0002, 0.995);

            //FIXME need to iron out the rate adjustment
            //FIXME we should taper the adjustments over time
            //FIXME don't just use the lastDuration, take an average
            if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
                if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
                        lastDuration <= cyclePeriod) {
                    lastCount = Math.min(lastCount + addDelta, intents.size() / 2);
                } else {
                    lastCount *= multRatio;
                }
                log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
                         lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
            }

        }
    }

    // Event listener to monitor throughput.
    final class Listener implements IntentListener {

        private final Counter runningTotal = new Counter();
        private volatile Map<IntentEvent.Type, Counter> counters;

        private volatile double processedThroughput = 0;
        private volatile double requestThroughput = 0;

        public Listener() {
            counters = initCounters();
        }

        private Map<IntentEvent.Type, Counter> initCounters() {
            Map<IntentEvent.Type, Counter> map = Maps.newHashMap();
            for (IntentEvent.Type type : IntentEvent.Type.values()) {
                map.put(type, new Counter());
            }
            return map;
        }

        public double processedThroughput() {
            return processedThroughput;
        }

        public double requestThroughput() {
            return requestThroughput;
        }

        @Override
        public void event(IntentEvent event) {
            if (event.subject().appId().equals(appId)) {
                counters.get(event.type()).add(1);
            }
        }

        public void report() {
            Map<IntentEvent.Type, Counter> reportCounters = counters;
            counters = initCounters();

            // update running total and latest throughput
            Counter installed = reportCounters.get(INSTALLED);
            Counter withdrawn = reportCounters.get(WITHDRAWN);
            processedThroughput = installed.throughput() + withdrawn.throughput();
            runningTotal.add(installed.total() + withdrawn.total());

            Counter installReq = reportCounters.get(INSTALL_REQ);
            Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
            requestThroughput = installReq.throughput() + withdrawReq.throughput();

            // build the string to report
            StringBuilder stringBuilder = new StringBuilder();
            for (IntentEvent.Type type : IntentEvent.Type.values()) {
                Counter counter = reportCounters.get(type);
                stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
            }
            log.info("Throughput: OVERALL={}; CURRENT={}; {}",
                     format("%.2f", runningTotal.throughput()),
                     format("%.2f", processedThroughput),
                     stringBuilder);

            sampleCollector.recordSample(runningTotal.throughput(),
                                         processedThroughput);
        }
    }

    private class InternalControl implements Consumer<String> {
        @Override
        public void accept(String cmd) {
            log.info("Received command {}", cmd);
            if (cmd.equals(START)) {
                startTestRun();
            } else {
                stopTestRun();
            }
        }
    }

    private class ReporterTask extends TimerTask {
        @Override
        public void run() {
            //adjustRates(); // FIXME we currently adjust rates in the cycle thread
            listener.report();
        }
    }

}
