/*
 * Copyright 2015-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.onosproject.mlb;

import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipAdminService;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.osgi.service.component.ComponentContext;
import org.onosproject.net.region.RegionEvent;
import org.onosproject.net.region.RegionListener;
import org.onosproject.net.region.RegionService;
import org.slf4j.Logger;

import java.util.Dictionary;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;

/**
 * An app to perform automatic load balancing in response to events.  Load balancing events are triggered by any
 * change in mastership and are limited to a frequency of one every 30 seconds, all load balancing is run on an outside
 * thread executor that must only have one thread due to issues that can occur is multiple balancing events occur in
 * parallel.
 */
@Component(immediate = true)
public class MastershipLoadBalancer {

    private final Logger log = getLogger(getClass());

    private static final int DEFAULT_SCHEDULE_PERIOD = 30;
    @Property(name = "schedulePeriod", intValue = DEFAULT_SCHEDULE_PERIOD,
            label = "Period to schedule balancing the mastership to be shared as evenly as by all online instances.")
    private int schedulePeriod = DEFAULT_SCHEDULE_PERIOD;

    private static final String REBALANCE_MASTERSHIP = "rebalance/mastership";

    private NodeId localId;

    private AtomicBoolean isLeader = new AtomicBoolean(false);

    private AtomicReference<Future> nextTask = new AtomicReference<>();

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipAdminService mastershipAdminService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected LeadershipService leadershipService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected RegionService regionService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ComponentConfigService cfgService;

    private InnerLeadershipListener leadershipListener = new InnerLeadershipListener();

    /* This listener is used to trigger balancing for any mastership event
     * which will include switches changing state between active and inactive
     * states as well as the same variety of event occurring with ONOS nodes.
     */
    private InnerMastershipListener mastershipListener = new InnerMastershipListener();

    /* Used to trigger balancing on region events where there was either a
     * change on the master sets of a given region or a change on the devices
     * that belong to a region.
     */
    private InnerRegionListener regionEventListener = new InnerRegionListener();

    /* Ensures that all executions do not interfere with one another (single
     * thread) and that they are apart from each other by at least what is
     * defined as the schedulePeriod.
     */
    private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
            groupedThreads("MastershipLoadBalancer", "%d", log));

    @Activate
    public void activate(ComponentContext context) {
        cfgService.registerProperties(getClass());
        modified(context);
        mastershipService.addListener(mastershipListener);
        localId = clusterService.getLocalNode().id();
        leadershipService.addListener(leadershipListener);
        leadershipService.runForLeadership(REBALANCE_MASTERSHIP);
        regionService.addListener(regionEventListener);
        log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        cfgService.unregisterProperties(getClass(), false);
        mastershipService.removeListener(mastershipListener);
        leadershipService.withdraw(REBALANCE_MASTERSHIP);
        leadershipService.removeListener(leadershipListener);
        regionService.removeListener(regionEventListener);
        cancelBalance();
        executorService.shutdown();
        log.info("Stopped");
    }

    @Modified
    public void modified(ComponentContext context) {
        readComponentConfiguration(context);
        cancelBalance();
        scheduleBalance();
        log.info("modified");
    }

    private synchronized void processLeaderChange(NodeId newLeader) {
        boolean currLeader = newLeader.equals(localId);
        if (isLeader.getAndSet(currLeader) != currLeader) {
            if (currLeader) {
                scheduleBalance();
            } else {
                cancelBalance();
            }
        }
    }

    // Sets flag at execution to indicate there is currently a scheduled
    // rebalancing. As soon as it starts running, the flag is set back to
    // null and another rebalancing can be queued.
    private void scheduleBalance() {
        if (isLeader.get() && nextTask.get() == null) {

            Future task = executorService.schedule(new BalanceTask(),
                    schedulePeriod, TimeUnit.SECONDS);

            if (!nextTask.compareAndSet(null, task)) {
                task.cancel(false);
            }
        }
    }

    private class BalanceTask implements Runnable {

        @Override
        public void run() {
            // nextTask is now running, free the spot so that it is possible
            // to queue up another upcoming task.
            nextTask.set(null);

            mastershipAdminService.balanceRoles();
            log.info("Completed balance roles");
        }
    }

    private void cancelBalance() {
        Future task = nextTask.getAndSet(null);
        if (task != null) {
            task.cancel(false);
        }
    }

    /**
     * Extracts properties from the component configuration context.
     *
     * @param context the component context
     */
    private void readComponentConfiguration(ComponentContext context) {
        Dictionary<?, ?> properties = context.getProperties();

        Integer newSchedulePeriod = Tools.getIntegerProperty(properties,
                                                             "schedulePeriod");
        if (newSchedulePeriod == null) {
            schedulePeriod = DEFAULT_SCHEDULE_PERIOD;
            log.info("Schedule period is not configured, default value is {}",
                     DEFAULT_SCHEDULE_PERIOD);
        } else {
            schedulePeriod = newSchedulePeriod;
            log.info("Configured. Schedule period is configured to {}", schedulePeriod);
        }
    }

    private class InnerMastershipListener implements MastershipListener {

        @Override
        public void event(MastershipEvent event) {
            scheduleBalance();
        }
    }

    private class InnerLeadershipListener implements LeadershipEventListener {
        @Override
        public boolean isRelevant(LeadershipEvent event) {
            return REBALANCE_MASTERSHIP.equals(event.subject().topic());
        }

        @Override
        public void event(LeadershipEvent event) {
            processLeaderChange(event.subject().leaderNodeId());
        }
    }

    private class InnerRegionListener implements RegionListener {
        @Override
        public void event(RegionEvent event) {
            switch (event.type()) {
                case REGION_MEMBERSHIP_CHANGED:
                case REGION_UPDATED:
                    scheduleBalance();
                    break;
                default:
                    break;
            }
        }
    }
}
