/*
 * Copyright 2020-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.onosproject.segmentrouting.phasedrecovery.impl;

import com.google.common.collect.Sets;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceAdminService;
import org.onosproject.segmentrouting.SegmentRoutingService;
import org.onosproject.segmentrouting.phasedrecovery.api.Phase;
import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Dictionary;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PHASED_RECOVERY_DEFAULT;
import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PROP_PHASED_RECOVERY;

@Component(
        immediate = true,
        service = PhasedRecoveryService.class,
        property = {
                PROP_PHASED_RECOVERY + ":Boolean=" + PHASED_RECOVERY_DEFAULT
        }
)
public class PhasedRecoveryManager implements PhasedRecoveryService {
    private static final Logger log = LoggerFactory.getLogger(PhasedRecoveryManager.class);
    private static final String APP_NAME = "org.onosproject.phasedrecovery";

    // TODO Make these configurable via Component Config
    // Amount of time delayed to wait for port description (in second)
    private static final int PORT_CHECKER_INTERVAL = 1;
    // Max number of retry for port checker
    private static final int PORT_CHECKER_RETRIES = 15;
    // RoutingStableChecker interval (in second)
    private static final int ROUTING_CHECKER_DELAY = 3;
    // RoutingStableChecker timeout (in second)
    private static final int ROUTING_CHECKER_TIMEOUT = 15;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private CoreService coreService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private ComponentConfigService compCfgService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private DeviceAdminService deviceAdminService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private MastershipService mastershipService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private StorageService storageService;

    @Reference(cardinality = ReferenceCardinality.OPTIONAL)
    volatile SegmentRoutingService srService;

    /** Enabling phased recovery. */
    boolean phasedRecovery = PHASED_RECOVERY_DEFAULT;

    private ApplicationId appId;
    private ConsistentMap<DeviceId, Phase> phasedRecoveryStore;
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(
            Runtime.getRuntime().availableProcessors(), groupedThreads("onos/sr/pr", "executor"));

    @Activate
    protected void activate(ComponentContext context) {
        appId = coreService.registerApplication(APP_NAME);

        KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
                .register(KryoNamespaces.API)
                .register(Phase.class);
        phasedRecoveryStore = storageService.<DeviceId, Phase>consistentMapBuilder()
                .withName("onos-sr-phasedrecovery")
                .withRelaxedReadConsistency()
                .withSerializer(Serializer.using(serializer.build()))
                .build();

        compCfgService.registerProperties(getClass());
        modified(context);
        log.info("Started");
    }

    @Deactivate
    protected void deactivate() {
        phasedRecoveryStore.destroy();
        compCfgService.unregisterProperties(getClass(), false);
        log.info("Stopped");
    }

    @Modified
    protected void modified(ComponentContext context) {
        Dictionary<?, ?> properties = context.getProperties();
        if (properties == null) {
            return;
        }

        String strPhasedRecovery = Tools.get(properties, PROP_PHASED_RECOVERY);
        boolean expectPhasedRecovery = Boolean.parseBoolean(strPhasedRecovery);
        if (expectPhasedRecovery != phasedRecovery) {
            phasedRecovery = expectPhasedRecovery;
            log.info("{} phased recovery", phasedRecovery ? "Enabling" : "Disabling");
        }
    }

    @Override
    public boolean isEnabled() {
        return phasedRecovery;
    }

    @Override
    public boolean init(DeviceId deviceId) {
        if (this.srService == null) {
            log.info("SegmentRoutingService is not ready");
            return false;
        }
        if (!mastershipService.isLocalMaster(deviceId)) {
            log.info("Not master of {}", deviceId);
            return false;
        }

        Phase phase = Optional.ofNullable(phasedRecoveryStore.putIfAbsent(deviceId, Phase.PENDING))
                .map(Versioned::value).orElse(null);

        if (phase != null) {
            log.info("{} has been initialized already. Skipping.", deviceId);
            return false;
        } else {
            if (phasedRecovery) {
                // Even in case of EDGE as next phase, it is better to drive the transition
                // to the next phase through the port checker. If the device is reported by
                // a non master instance the first time, ports wont be available until the next
                // port reconciliation.
                Phase nextPhase = this.srService.getPairDeviceId(deviceId).isPresent() ?
                        Phase.PAIR : Phase.EDGE;
                // Wait for the PORT_STAT before entering next phase.
                // Note: Unlikely, when the device init fails due to PORT_STATS timeout,
                //       it requires operator to manually move the device to the next phase by CLI command.
                executor.schedule(new PortChecker(deviceId, PORT_CHECKER_RETRIES, nextPhase),
                        PORT_CHECKER_INTERVAL, TimeUnit.SECONDS);
            } else {
                // We assume that all ports will be reported as enabled on devices that don't require phased recovery
                setPhase(deviceId, Phase.EDGE);
            }
            return true;
        }
    }

    @Override
    public boolean reset(DeviceId deviceId) {
        if (this.srService == null) {
            log.info("SegmentRoutingService is not ready");
            return false;
        }
        // FIXME Skip mastership checking since master will not be available when a device goes offline
        //       Improve this when persistent mastership is introduced

        Phase result = Optional.ofNullable(phasedRecoveryStore.remove(deviceId))
                .map(Versioned::value).orElse(null);
        if (result != null) {
            log.info("{} is reset", deviceId);
        }
        return result != null;
    }

    @Override
    public Map<DeviceId, Phase> getPhases() {
        return phasedRecoveryStore.asJavaMap();
    }

    @Override
    public Phase getPhase(DeviceId deviceId) {
        return Optional.ofNullable(phasedRecoveryStore.get(deviceId)).map(Versioned::value).orElse(null);
    }

    @Override
    public Phase setPhase(DeviceId deviceId, Phase newPhase) {
        if (this.srService == null) {
            log.info("SegmentRoutingService is not ready");
            return null;
        }
        if (!mastershipService.isLocalMaster(deviceId)) {
            log.info("Not master of {}", deviceId);
            return null;
        }

        return Optional.ofNullable(phasedRecoveryStore.compute(deviceId, (k, v) -> {
            if (v == null && newPhase == Phase.PENDING) {
                log.info("Initializing {}", deviceId);
                return newPhase;
            } else if (v == Phase.PENDING && newPhase == Phase.PAIR) {
                srService.initHost(deviceId);
                // RouteHandler init is intentionally skipped when phased recovery is on.
                // Edge ports remain down in this phase. Therefore, no nexthop will be discovered on the given device.
                // The flow on given device will be programmed later by hostHandler.processHostMovedEvent()
                changePairPort(deviceId, true);
                log.info("Transitioning {} from PENDING to PAIR", deviceId);
                return newPhase;
            } else if (v == Phase.PAIR && newPhase == Phase.INFRA) {
                changeInfraPorts(deviceId, true);
                srService.initRoute(deviceId);
                log.info("Transitioning {} from PAIR to INFRA", deviceId);
                monitorRoutingStability(deviceId, newPhase);
                return newPhase;
            } else if (v == Phase.INFRA && newPhase == Phase.EDGE) {
                changeEdgePorts(deviceId, true);
                log.info("Transitioning {} from INFRA to EDGE", deviceId);
                return newPhase;
            } else if (v == Phase.PENDING && newPhase == Phase.EDGE) {
                // We want to enable ports in order - even for non paired devices
                // newPhase is used to implement different behaviors in monitorRoutingStability
                srService.initHost(deviceId);
                changeInfraPorts(deviceId, true);
                srService.initRoute(deviceId);
                monitorRoutingStability(deviceId, newPhase);
                log.info("Transitioning {} from PENDING to EDGE", deviceId);
                return newPhase;
            } else {
                log.debug("Ignore illegal state transition on {} from {} to {}", deviceId, v, newPhase);
                return v;
            }
        })).map(Versioned::value).orElse(null);
    }

    // Use current phase to implement a different behavior for non-paired or infra
    // devices. For these devices we are just missing to enable the edge ports, while
    // for the paired leaves we will go the final phase.
    private void monitorRoutingStability(DeviceId deviceId, Phase currentPhase) {
        // restrict the state machine transitions
        if (currentPhase != Phase.INFRA && currentPhase != Phase.EDGE) {
            log.warn("Ignore monitorRoutingStability on {} for phase {}", deviceId, currentPhase);
            return;
        }

        CompletableFuture<Void> checkerFuture = new CompletableFuture<>();
        CompletableFuture<Void> timeoutFuture =
                Tools.completeAfter(ROUTING_CHECKER_TIMEOUT, TimeUnit.SECONDS);
        RoutingStabilityChecker checker = new RoutingStabilityChecker(checkerFuture);

        checkerFuture.runAfterEitherAsync(timeoutFuture, () -> {
            if (checkerFuture.isDone()) {
                log.info("Routing stable. Move {} to the next phase", deviceId);
            } else {
                log.info("Timeout reached. Move {} to the next phase", deviceId);
                // Mark the future as completed to signify the termination of periodical checker
                checkerFuture.complete(null);
            }
            // Paired devices. Else will be performed by infra devices
            // or non paired devices that go directly from PENDING to EDGE
            if (currentPhase == Phase.INFRA) {
                setPhase(deviceId, Phase.EDGE);
            } else {
                changeEdgePorts(deviceId, true);
            }
        });

        executor.schedule(checker, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS);
    }

    @Override
    public Set<PortNumber> changeAllPorts(DeviceId deviceId, boolean enabled) {
        if (this.srService == null) {
            log.warn("SegmentRoutingService is not ready. Unable to changeAllPorts({}) to {}", deviceId, enabled);
            return Sets.newHashSet();
        }
        Set<PortNumber> portsToBeEnabled = deviceAdminService.getPorts(deviceId)
                .stream().map(Port::number).collect(Collectors.toSet());
        changePorts(deviceId, portsToBeEnabled, enabled);
        return portsToBeEnabled;
    }

    @Override
    public Set<PortNumber> changePairPort(DeviceId deviceId, boolean enabled) {
        if (this.srService == null) {
            log.warn("SegmentRoutingService is not ready. Unable to changePairPort({}) to {}", deviceId, enabled);
            return Sets.newHashSet();
        }
        Set<PortNumber> portsToBeEnabled = this.srService.getPairLocalPort(deviceId)
                .map(Sets::newHashSet).orElse(Sets.newHashSet());
        changePorts(deviceId, portsToBeEnabled, enabled);
        return portsToBeEnabled;
    }

    @Override
    public Set<PortNumber> changeInfraPorts(DeviceId deviceId, boolean enabled) {
        if (this.srService == null) {
            log.warn("SegmentRoutingService is not ready. Unable to changeInfraPorts({}) to {}", deviceId, enabled);
            return Sets.newHashSet();
        }
        Set<PortNumber> portsToBeEnabled = this.srService.getInfraPorts(deviceId);
        changePorts(deviceId, portsToBeEnabled, enabled);
        return portsToBeEnabled;
    }

    @Override
    public Set<PortNumber> changeEdgePorts(DeviceId deviceId, boolean enabled) {
        if (this.srService == null) {
            log.warn("SegmentRoutingService is not ready. Unable to changeEdgePorts({}) to {}", deviceId, enabled);
            return Sets.newHashSet();
        }
        Set<PortNumber> portsToBeEnabled = this.srService.getEdgePorts(deviceId);
        changePorts(deviceId, portsToBeEnabled, enabled);
        return portsToBeEnabled;
    }

    private void changePorts(DeviceId deviceId, Set<PortNumber> portNumbers, boolean enabled) {
        if (!portNumbers.isEmpty()) {
            log.info("{} {} on {}", enabled ? "Enabled" : "Disabled", portNumbers, deviceId);
            portNumbers.forEach(portNumber ->
                    deviceAdminService.changePortState(deviceId, portNumber, enabled));
        }
    }

    private class PortChecker implements Runnable {
        int retries;
        DeviceId deviceId;
        Phase nextPhase;

        PortChecker(DeviceId deviceId, int retries, Phase nextPhase) {
            this.deviceId = deviceId;
            this.retries = retries;
            this.nextPhase = nextPhase;
        }

        @Override
        public void run() {
            retries -= 1;
            if (retries < 0) {
                log.warn("PORT_STATS timeout. Unable to initialize {}", deviceId);
                return;
            }

            if (!deviceAdminService.getPorts(deviceId).isEmpty()) {
                log.info("{} reported PORT_STATS, moving to {}", deviceId, nextPhase);
                setPhase(deviceId, nextPhase);
                return;
            }
            log.info("{} still waiting for PORT_STATS", deviceId);
            executor.schedule(this, PORT_CHECKER_INTERVAL, TimeUnit.SECONDS);
        }
    }

    private class RoutingStabilityChecker implements Runnable {
        private final CompletableFuture<Void> future;

        RoutingStabilityChecker(CompletableFuture<Void> future) {
            this.future = future;
        }

        @Override
        public void run() {
            // Do not continue if the future has been completed
            if (future.isDone()) {
                log.trace("RouteStabilityChecker is done. Stop checking");
                return;
            }

            if (srService.isRoutingStable()) {
                log.trace("Routing is stable");
                future.complete(null);
            } else {
                log.trace("Routing is not yet stable");
                executor.schedule(this, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS);
            }
        }
    }
}
