/*
 * Copyright 2017-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.onosproject.provider.general.device.impl;

import com.google.common.collect.Maps;
import org.onlab.packet.ChassisId;
import org.onlab.util.ItemNotFoundException;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
import org.onosproject.gnmi.api.GnmiController;
import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.behaviour.PiPipelineProgrammable;
import org.onosproject.net.behaviour.PortAdmin;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.BasicDeviceConfig;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DeviceAgentEvent;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
import org.onosproject.net.device.DeviceHandshaker;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.driver.Behaviour;
import org.onosproject.net.driver.DefaultDriverData;
import org.onosproject.net.driver.DefaultDriverHandler;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverData;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.provider.general.device.impl.DeviceTaskExecutor.DeviceTaskException;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;

import java.util.Collections;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.OP_TIMEOUT_SHORT;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.OP_TIMEOUT_SHORT_DEFAULT;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL_DEFAULT;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL_DEFAULT;
import static org.slf4j.LoggerFactory.getLogger;

/**
 * Provider which uses drivers to discover devices, perform initial handshake,
 * and notify the core of disconnection events. The implementation listens for
 * events from netcfg or the drivers (via {@link DeviceAgentListener}) andP
 * schedules task for each event.
 */
@Component(immediate = true,
        property = {
                PROBE_INTERVAL + ":Integer=" + PROBE_INTERVAL_DEFAULT,
                STATS_POLL_INTERVAL + ":Integer=" + STATS_POLL_INTERVAL_DEFAULT,
                OP_TIMEOUT_SHORT + ":Integer=" + OP_TIMEOUT_SHORT_DEFAULT,
        })
public class GeneralDeviceProvider extends AbstractProvider
        implements DeviceProvider {

    private final Logger log = getLogger(getClass());

    private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
    private static final String URI_SCHEME = "device";
    private static final String DEVICE_PROVIDER_PACKAGE =
            "org.onosproject.general.provider.device";
    private static final int CORE_POOL_SIZE = 10;
    private static final String UNKNOWN = "unknown";

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private DeviceProviderRegistry providerRegistry;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private ComponentConfigService componentConfigService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private NetworkConfigRegistry cfgService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private CoreService coreService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private DeviceService deviceService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private DriverService driverService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private MastershipService mastershipService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private PiPipeconfService pipeconfService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private PiPipeconfWatchdogService pipeconfWatchdogService;

    // FIXME: no longer general if we add a dependency to a protocol-specific
    // service. Possible solutions are: rename this provider to
    // StratumDeviceProvider, find a way to allow this provider to register for
    // protocol specific events (e.g. port events) via drivers (similar to
    // DeviceAgentListener).
    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    private GnmiController gnmiController;

    private GnmiDeviceStateSubscriber gnmiDeviceStateSubscriber;

    /** Configure interval for checking device availability; default is 10 sec. */
    private int probeInterval = PROBE_INTERVAL_DEFAULT;

    /** Configure poll frequency for port status and stats; default is 10 sec. */
    private int statsPollInterval = STATS_POLL_INTERVAL_DEFAULT;

    /** Configure timeout in seconds for device operations; default is 10 sec. */
    private int opTimeoutShort = OP_TIMEOUT_SHORT_DEFAULT;

    private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap();
    private final Map<DeviceId, Long> lastProbedAvailability = Maps.newConcurrentMap();
    private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener();
    private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
    private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();

    private ExecutorService mainExecutor;
    private DeviceTaskExecutor<TaskType> taskExecutor;
    private ScheduledExecutorService probeExecutor;
    private ScheduledFuture<?> probeTask;
    private StatsPoller statsPoller;
    private DeviceProviderService providerService;

    public GeneralDeviceProvider() {
        super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
    }

    protected DeviceProviderService providerService() {
        return providerService;
    }

    @Activate
    public void activate(ComponentContext context) {
        mainExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
                "onos/gdp-task", "%d", log));
        taskExecutor = new DeviceTaskExecutor<>(mainExecutor);
        probeExecutor = newSingleThreadScheduledExecutor(groupedThreads(
                "onos/gdp-probe", "%d", log));
        providerService = providerRegistry.register(this);
        componentConfigService.registerProperties(getClass());
        coreService.registerApplication(APP_NAME);
        cfgService.addListener(cfgListener);
        pipeconfWatchdogService.addListener(pipeconfWatchdogListener);
        gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber(
                gnmiController, deviceService, mastershipService, providerService);
        gnmiDeviceStateSubscriber.activate();
        startOrRescheduleProbeTask();
        statsPoller = new StatsPoller(deviceService, mastershipService, providerService);
        statsPoller.activate(statsPollInterval);
        modified(context);
        log.info("Started");
    }

    @Modified
    public void modified(ComponentContext context) {
        if (context == null) {
            return;
        }

        Dictionary<?, ?> properties = context.getProperties();
        final int oldProbeFrequency = probeInterval;
        probeInterval = Tools.getIntegerProperty(
                properties, PROBE_INTERVAL, PROBE_INTERVAL_DEFAULT);
        log.info("Configured. {} is configured to {} seconds",
                 PROBE_INTERVAL, probeInterval);
        final int oldStatsPollFrequency = statsPollInterval;
        statsPollInterval = Tools.getIntegerProperty(
                properties, STATS_POLL_INTERVAL, STATS_POLL_INTERVAL_DEFAULT);
        log.info("Configured. {} is configured to {} seconds",
                 STATS_POLL_INTERVAL, statsPollInterval);
        opTimeoutShort = Tools.getIntegerProperty(
                properties, OP_TIMEOUT_SHORT, OP_TIMEOUT_SHORT_DEFAULT);
        log.info("Configured. {} is configured to {} seconds",
                 OP_TIMEOUT_SHORT, opTimeoutShort);

        if (oldProbeFrequency != probeInterval) {
            startOrRescheduleProbeTask();
        }

        if (oldStatsPollFrequency != statsPollInterval) {
            statsPoller.reschedule(statsPollInterval);
        }
    }

    @Deactivate
    public void deactivate() {
        // Shutdown stats poller.
        statsPoller.deactivate();
        statsPoller = null;
        // Shutdown probe executor.
        probeTask.cancel(true);
        probeTask = null;
        probeExecutor.shutdownNow();
        try {
            probeExecutor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("probeExecutor not terminated properly");
        }
        probeExecutor = null;
        // Shutdown main and task executor.
        taskExecutor.cancel();
        taskExecutor = null;
        mainExecutor.shutdownNow();
        try {
            mainExecutor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("connectionExecutor not terminated properly");
        }
        mainExecutor = null;
        // Remove all device agent listeners
        handshakersWithListeners.values().forEach(h -> h.removeDeviceAgentListener(id()));
        handshakersWithListeners.clear();
        // Other cleanup.
        lastProbedAvailability.clear();
        componentConfigService.unregisterProperties(getClass(), false);
        cfgService.removeListener(cfgListener);
        pipeconfWatchdogService.removeListener(pipeconfWatchdogListener);
        providerRegistry.unregister(this);
        providerService = null;
        gnmiDeviceStateSubscriber.deactivate();
        gnmiDeviceStateSubscriber = null;
        log.info("Stopped");
    }

    @Override
    public void triggerProbe(DeviceId deviceId) {
        checkNotNull(deviceId);
        submitTask(deviceId, TaskType.PROBE_AVAILABILITY);
    }

    @Override
    public void roleChanged(DeviceId deviceId, MastershipRole newRole) {

        final MastershipInfo mastershipInfo = mastershipService.getMastershipFor(deviceId);
        final NodeId localNodeId = clusterService.getLocalNode().id();

        if (!mastershipInfo.getRole(localNodeId).equals(newRole)) {
            log.warn("Inconsistent mastership info for {}! Requested {}, but " +
                             "mastership service reports {}, will apply the latter...",
                     deviceId, newRole, mastershipInfo.getRole(localNodeId));
            newRole = mastershipInfo.getRole(localNodeId);
        }

        // Derive preference value.
        final int preference;
        switch (newRole) {
            case MASTER:
                preference = 0;
                break;
            case STANDBY:
                preference = mastershipInfo.backups().indexOf(localNodeId) + 1;
                if (preference == 0) {
                    // Not found in list.
                    log.error("Unable to derive mastership preference for {}, " +
                                      "requested role {} but local node ID was " +
                                      "not found among list of backup nodes " +
                                      "reported by mastership service");
                    return;
                }
                break;
            case NONE:
                // No preference for NONE, apply as is.
                log.info("Notifying role {} to {}", newRole, deviceId);
                roleChanged(deviceId, newRole);
                return;
            default:
                log.error("Unrecognized mastership role {}", newRole);
                return;
        }

        log.info("Notifying role {} (preference {}) for term {} to {}",
                 newRole, preference, mastershipInfo.term(), deviceId);

        final DeviceHandshaker handshaker = getBehaviour(
                deviceId, DeviceHandshaker.class);
        if (handshaker == null) {
            log.error("Null handshaker. Unable to notify role {} to {}",
                      newRole, deviceId);
            return;
        }

        try {
            handshaker.roleChanged(preference, mastershipInfo.term());
        } catch (UnsupportedOperationException e) {
            // Preference-based method not supported.
            handshaker.roleChanged(newRole);
        }
    }

    @Override
    public boolean isReachable(DeviceId deviceId) {
        final DeviceHandshaker handshaker = getBehaviour(
                deviceId, DeviceHandshaker.class);
        if (handshaker == null) {
            return false;
        }
        return handshaker.isReachable();
    }

    @Override
    public boolean isAvailable(DeviceId deviceId) {
        final DeviceHandshaker handshaker = getBehaviour(
                deviceId, DeviceHandshaker.class);
        if (handshaker == null) {
            return false;
        }
        try {
            // Try without probing the device...
            return handshaker.isAvailable();
        } catch (UnsupportedOperationException e) {
            // Driver does not support that.
            return probeAvailability(handshaker);
        }
    }

    @Override
    public void changePortState(DeviceId deviceId, PortNumber portNumber,
                                boolean enable) {
        if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
            log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
                     deviceId);
            return;
        }
        final PortAdmin portAdmin = deviceService.getDevice(deviceId)
                .as(PortAdmin.class);
        final CompletableFuture<Boolean> modifyTask = enable
                ? portAdmin.enable(portNumber)
                : portAdmin.disable(portNumber);
        final String descr = (enable ? "enabling" : "disabling") + " port " + portNumber;
        getFutureWithDeadline(
                modifyTask, descr, deviceId, null, opTimeoutShort);
    }

    @Override
    public void triggerDisconnect(DeviceId deviceId) {
        checkNotNull(deviceId);
        log.info("Triggering disconnection of device {}", deviceId);
        submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
    }

    /**
     * Listener for configuration events.
     */
    private class InternalNetworkConfigListener implements NetworkConfigListener {
        @Override
        public void event(NetworkConfigEvent event) {
            DeviceId deviceId = (DeviceId) event.subject();
            switch (event.type()) {
                case CONFIG_ADDED:
                    if (configIsComplete(deviceId)) {
                        submitTask(deviceId, TaskType.CONNECTION_SETUP);
                    }
                    break;
                case CONFIG_UPDATED:
                    if (configIsComplete(deviceId) && mgmtAddrUpdated(event)) {
                        submitTask(deviceId, TaskType.CONNECTION_UPDATE);
                    }
                    break;
                case CONFIG_REMOVED:
                    if (event.configClass().equals(BasicDeviceConfig.class)) {
                        submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
                    }
                    break;
                default:
                    // Ignore
                    break;
            }
        }

        private boolean mgmtAddrUpdated(NetworkConfigEvent event) {
            if (!event.prevConfig().isPresent() || !event.config().isPresent()) {
                return false;
            }
            final BasicDeviceConfig prev = (BasicDeviceConfig) event.prevConfig().get();
            final BasicDeviceConfig current = (BasicDeviceConfig) event.config().get();
            return !Objects.equals(prev.managementAddress(), current.managementAddress());
        }

        @Override
        public boolean isRelevant(NetworkConfigEvent event) {
            return event.configClass().equals(BasicDeviceConfig.class) &&
                    (event.subject() instanceof DeviceId) &&
                    myScheme((DeviceId) event.subject());
        }
    }

    /**
     * Listener for device agent events.
     */
    private class InternalDeviceAgentListener implements DeviceAgentListener {
        @Override
        public void event(DeviceAgentEvent event) {
            DeviceId deviceId = event.subject();
            switch (event.type()) {
                case CHANNEL_OPEN:
                    submitTask(deviceId, TaskType.CHANNEL_OPEN);
                    break;
                case CHANNEL_CLOSED:
                case CHANNEL_ERROR:
                    submitTask(deviceId, TaskType.CHANNEL_CLOSED);
                    break;
                case ROLE_MASTER:
                    submitTask(deviceId, TaskType.ROLE_MASTER);
                    break;
                case ROLE_STANDBY:
                    submitTask(deviceId, TaskType.ROLE_STANDBY);
                    break;
                case ROLE_NONE:
                    submitTask(deviceId, TaskType.ROLE_NONE);
                    break;
                case NOT_MASTER:
                    submitTask(deviceId, TaskType.NOT_MASTER);
                    break;
                default:
                    log.warn("Unrecognized device agent event {}", event.type());
            }
        }
    }

    /**
     * Pipeline event listener.
     */
    private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
        @Override
        public void event(PiPipeconfWatchdogEvent event) {
            final DeviceId deviceId = event.subject();
            switch (event.type()) {
                case PIPELINE_READY:
                    submitTask(deviceId, TaskType.PIPELINE_READY);
                    break;
                case PIPELINE_UNKNOWN:
                    submitTask(deviceId, TaskType.PIPELINE_NOT_READY);
                    break;
                default:
                    break;
            }
        }

        @Override
        public boolean isRelevant(PiPipeconfWatchdogEvent event) {
            return myScheme(event.subject());
        }
    }

    private void startOrRescheduleProbeTask() {
        synchronized (this) {
            if (probeTask != null) {
                probeTask.cancel(false);
            }
            probeTask = probeExecutor.scheduleAtFixedRate(
                    this::submitProbeTasks,
                    0,
                    probeInterval,
                    TimeUnit.SECONDS);
        }
    }

    private void submitProbeTasks() {
        // Async trigger a task for all devices in the cfg.
        log.debug("Starting probing for all devices");
        cfgService.getSubjects(DeviceId.class).stream()
                .filter(GeneralDeviceProvider::myScheme)
                .forEach(this::submitProbeTask);
    }

    private void submitProbeTask(DeviceId deviceId) {
        final DeviceHandshaker handshaker = handshakersWithListeners.get(deviceId);

        if (handshaker == null) {
            if (configIsComplete(deviceId)) {
                // Device in config but we have not initiated a connection.
                // Perhaps we missed the config event?
                submitTask(deviceId, TaskType.CONNECTION_SETUP);
            }
            return;
        }

        if (!handshaker.isConnected()) {
            // Device is in the core, but driver reports there is NOT a
            // connection to it. Perhaps the netcfg changed and we didn't
            // pick the event?
            log.warn("Re-establishing lost connection to {}", deviceId);
            submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
            submitTask(deviceId, TaskType.CONNECTION_SETUP);
            return;
        }

        // On probing offline devices, while we expect them to signal
        // availability via CHANNEL_OPEN or similar events, periodic probing
        // might be needed to stimulate some channel activity. We might consider
        // requiring active probing of closed channels in the protocol layer.

        final Long lastProbe = lastProbedAvailability.get(deviceId);
        if (lastProbe != null &&
                (currentTimeMillis() - lastProbe) < (probeInterval * 1000 / 3)) {
            // This avoids overload of probe tasks which might involve sending
            // messages over the network. We require a minimum interval of 1/3
            // of the configured probeInterval between consecutive probe tasks.
            if (log.isDebugEnabled()) {
                log.debug("Dropping probe task for {} as it happened recently",
                          deviceId);
            }
            return;
        }

        submitTask(deviceId, TaskType.PROBE_AVAILABILITY);
    }

    /**
     * Type of tasks performed by this provider.
     */
    enum TaskType {
        CONNECTION_SETUP,
        CONNECTION_UPDATE,
        CONNECTION_TEARDOWN,
        PIPELINE_READY,
        CHANNEL_OPEN,
        CHANNEL_CLOSED,
        PIPELINE_NOT_READY,
        PROBE_AVAILABILITY,
        ROLE_MASTER,
        ROLE_NONE,
        ROLE_STANDBY,
        NOT_MASTER,
    }

    private void submitTask(DeviceId deviceId, TaskType taskType) {
        taskExecutor.submit(deviceId, taskType, taskRunnable(deviceId, taskType));
    }

    private Runnable taskRunnable(DeviceId deviceId, TaskType taskType) {
        switch (taskType) {
            case CONNECTION_SETUP:
                return () -> handleConnectionSetup(deviceId);
            case CONNECTION_UPDATE:
                return () -> handleConnectionUpdate(deviceId);
            case CONNECTION_TEARDOWN:
                return () -> handleConnectionTeardown(deviceId);
            case CHANNEL_OPEN:
                return () -> handleProbeAvailability(deviceId);
            case CHANNEL_CLOSED:
                return () -> markOfflineIfNeeded(deviceId);
            case PIPELINE_NOT_READY:
                return () -> markOfflineIfNeeded(deviceId);
            case PIPELINE_READY:
                return () -> handleProbeAvailability(deviceId);
            case PROBE_AVAILABILITY:
                return () -> handleProbeAvailability(deviceId);
            case ROLE_MASTER:
                return () -> handleMastershipResponse(deviceId, MastershipRole.MASTER);
            case ROLE_STANDBY:
                return () -> handleMastershipResponse(deviceId, MastershipRole.STANDBY);
            case ROLE_NONE:
                return () -> handleMastershipResponse(deviceId, MastershipRole.NONE);
            case NOT_MASTER:
                return () -> handleNotMaster(deviceId);
            default:
                throw new IllegalArgumentException("Unrecognized task type " + taskType);
        }
    }

    private void handleConnectionSetup(DeviceId deviceId) {
        assertConfig(deviceId);
        // Bind pipeconf (if any and if device is capable).
        bindPipeconfIfRequired(deviceId);
        // Get handshaker.
        final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
        if (handshaker.isConnected() || handshakersWithListeners.containsKey(deviceId)) {
            throw new DeviceTaskException("connection already exists");
        }
        // Add device agent listener.
        handshaker.addDeviceAgentListener(id(), deviceAgentListener);
        handshakersWithListeners.put(deviceId, handshaker);
        // Start connection via handshaker.
        final Boolean connectSuccess = getFutureWithDeadline(
                handshaker.connect(), "initiating connection",
                deviceId, false, opTimeoutShort);
        if (!connectSuccess) {
            // Failed! Remove listeners.
            handshaker.removeDeviceAgentListener(id());
            handshakersWithListeners.remove(deviceId);
            throw new DeviceTaskException("connection failed");
        }
        createOrUpdateDevice(deviceId, false);
        final List<PortDescription> ports = getPortDetails(deviceId);
        providerService.updatePorts(deviceId, ports);
        // From here we expect a CHANNEL_OPEN event to update availability.
    }

    private void handleConnectionUpdate(DeviceId deviceId) {
        assertConfig(deviceId);
        final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
        if (!handshaker.isConnected()) {
            // If driver reports that a connection still exists, perhaps the
            // part of the netcfg that changed does not affect the connection.
            // Otherwise, remove any previous connection state from the old
            // netcfg and create a new one.
            log.warn("Detected change of connection endpoints for {}, will " +
                             "tear down existing connection and set up a new one...",
                     deviceId);
            handleConnectionTeardown(deviceId);
            handleConnectionSetup(deviceId);
        }
    }

    private void createOrUpdateDevice(DeviceId deviceId, boolean available) {
        if (deviceService.getDevice(deviceId) != null
                && deviceService.isAvailable(deviceId) == available) {
            // Other nodes might have advertised this device before us.
            return;
        }
        assertConfig(deviceId);
        providerService.deviceConnected(deviceId, getDeviceDescription(
                deviceId, available));
    }

    private boolean probeAvailability(DeviceHandshaker handshaker) {
        lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis());
        return getFutureWithDeadline(
                handshaker.probeAvailability(), "probing availability",
                handshaker.data().deviceId(), false, opTimeoutShort);
    }

    private boolean probeReachability(DeviceHandshaker handshaker) {
        lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis());
        return getFutureWithDeadline(
                handshaker.probeReachability(), "probing reachability",
                handshaker.data().deviceId(), false, opTimeoutShort);
    }

    private void markOfflineIfNeeded(DeviceId deviceId) {
        assertDeviceRegistered(deviceId);
        if (deviceService.isAvailable(deviceId)) {
            providerService.deviceDisconnected(deviceId);
        }
    }

    private void handleProbeAvailability(DeviceId deviceId) {
        assertDeviceRegistered(deviceId);

        // Make device has a valid mastership role.
        final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
        final MastershipRole deviceRole = handshaker.getRole();
        final MastershipRole expectedRole = mastershipService.getLocalRole(deviceId);
        if (expectedRole == MastershipRole.NONE || expectedRole != deviceRole) {
            // Device does NOT have a valid role...
            if (!handshaker.isReachable() && !probeReachability(handshaker)) {
                // ...but is not reachable. There isn't much we can do.
                markOfflineIfNeeded(deviceId);
                return;
            }
            // ...and is reachable, re-assert role.
            roleChanged(deviceId, expectedRole == MastershipRole.NONE
                    ? mastershipService.requestRoleForSync(deviceId)
                    : expectedRole);
            try {
                // Wait for role to be notified and reachability state to be
                // updated. This should be roughly equivalent to one RTT.
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }

        // Check and update availability.
        if (probeAvailability(handshakerOrFail(deviceId))) {
            // Device ready to do its job.
            createOrUpdateDevice(deviceId, true);
        } else {
            markOfflineIfNeeded(deviceId);
            if (handshaker.isReachable() && isPipelineProgrammable(deviceId)) {
                // If reachable, but not available, and pipeline programmable, there
                // is a high chance it's because the pipeline is not READY
                // (independently from what the pipeconf watchdog reports, as the
                // status there might be outdated). Encourage pipeconf watchdog to
                // perform a pipeline probe ASAP.
                pipeconfWatchdogService.triggerProbe(deviceId);
            }
        }
    }

    private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
        assertDeviceRegistered(deviceId);
        log.debug("Device {} asserted role {}", deviceId, response);
        providerService.receivedRoleReply(deviceId, response);
    }

    private void handleNotMaster(DeviceId deviceId) {
        assertDeviceRegistered(deviceId);
        if (mastershipService.isLocalMaster(deviceId)) {
            log.warn("Device {} notified that this node is not master, " +
                             "relinquishing mastership...", deviceId);
            mastershipService.relinquishMastership(deviceId);
        }
    }

    private void assertDeviceRegistered(DeviceId deviceId) {
        if (deviceService.getDevice(deviceId) == null) {
            throw new DeviceTaskException("device not registered in the core");
        }
    }

    private void handleConnectionTeardown(DeviceId deviceId) {
        if (deviceService.getDevice(deviceId) != null
                && deviceService.isAvailable(deviceId)) {
            providerService.deviceDisconnected(deviceId);
        }
        final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
        handshaker.removeDeviceAgentListener(id());
        handshakersWithListeners.remove(deviceId);
        handshaker.disconnect();
        lastProbedAvailability.remove(deviceId);
    }

    private void bindPipeconfIfRequired(DeviceId deviceId) {
        if (pipeconfService.getPipeconf(deviceId).isPresent()
                || !isPipelineProgrammable(deviceId)) {
            // Nothing to do.
            // Device has already a pipeconf or is not programmable.
            return;
        }
        // Get pipeconf from netcfg or driver (default one).
        final PiPipelineProgrammable pipelineProg = getBehaviour(
                deviceId, PiPipelineProgrammable.class);
        final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg);
        if (pipeconfId == null) {
            throw new DeviceTaskException("unable to find pipeconf");
        }
        // Store binding in pipeconf service.
        pipeconfService.bindToDevice(pipeconfId, deviceId);
    }

    private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
        // Places to look for a pipeconf ID (in priority order)):
        // 1) netcfg
        // 2) device driver (default one)
        final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
        if (pipeconfId != null && !pipeconfId.id().isEmpty()) {
            return pipeconfId;
        }
        if (pipelineProg != null
                && pipelineProg.getDefaultPipeconf().isPresent()) {
            final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
            log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
            return defaultPipeconf.id();
        }
        return null;
    }

    private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
        BasicDeviceConfig config = cfgService.getConfig(
                deviceId, BasicDeviceConfig.class);
        if (config == null) {
            return null;
        }
        return config.pipeconf() != null
                ? new PiPipeconfId(config.pipeconf()) : null;
    }

    private DeviceHandshaker handshakerOrFail(DeviceId deviceId) {
        final DeviceHandshaker handshaker = getBehaviour(
                deviceId, DeviceHandshaker.class);
        if (handshaker == null) {
            throw new DeviceTaskException("missing handshaker behavior");
        }
        return handshaker;
    }

    private boolean configIsComplete(DeviceId deviceId) {
        final BasicDeviceConfig basicDeviceCfg = cfgService.getConfig(
                deviceId, BasicDeviceConfig.class);
        return basicDeviceCfg != null && !isNullOrEmpty(basicDeviceCfg.driver());
    }

    private void assertConfig(DeviceId deviceId) {
        if (!configIsComplete(deviceId)) {
            throw new DeviceTaskException("configuration is not complete");
        }
    }

    private Driver getDriver(DeviceId deviceId) {
        try {
            // DriverManager checks first using basic device config.
            return driverService.getDriver(deviceId);
        } catch (ItemNotFoundException e) {
            log.error("Driver not found for {}", deviceId);
            return null;
        }
    }

    private <T extends Behaviour> T getBehaviour(DeviceId deviceId, Class<T> type) {
        Driver driver = getDriver(deviceId);
        if (driver == null) {
            return null;
        }
        if (!driver.hasBehaviour(type)) {
            return null;
        }
        final DriverData data = new DefaultDriverData(driver, deviceId);
        final DefaultDriverHandler handler = new DefaultDriverHandler(data);
        return driver.createBehaviour(handler, type);
    }

    private boolean hasBehaviour(DeviceId deviceId, Class<? extends Behaviour> type) {
        Driver driver = getDriver(deviceId);
        if (driver == null) {
            return false;
        }
        return driver.hasBehaviour(type);
    }

    private DeviceDescription getDeviceDescription(
            DeviceId deviceId, boolean defaultAvailable) {
        // Get one from driver or forge.
        final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
                deviceId, DeviceDescriptionDiscovery.class);
        if (deviceDiscovery == null) {
            return forgeDeviceDescription(deviceId, defaultAvailable);
        }

        final DeviceDescription d = deviceDiscovery.discoverDeviceDetails();
        if (d == null) {
            return forgeDeviceDescription(deviceId, defaultAvailable);
        }
        // Enforce defaultAvailable flag over the one obtained from driver.
        return new DefaultDeviceDescription(d, defaultAvailable, d.annotations());
    }

    private List<PortDescription> getPortDetails(DeviceId deviceId) {
        final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
                deviceId, DeviceDescriptionDiscovery.class);
        if (deviceDiscovery != null) {
            return deviceDiscovery.discoverPortDetails();
        } else {
            return Collections.emptyList();
        }
    }

    private DeviceDescription forgeDeviceDescription(
            DeviceId deviceId, boolean defaultAvailable) {
        // Uses handshaker and provider config to get driver data.
        final DeviceHandshaker handshaker = getBehaviour(
                deviceId, DeviceHandshaker.class);
        final Driver driver = handshaker != null
                ? handshaker.handler().driver() : null;
        return new DefaultDeviceDescription(
                deviceId.uri(),
                Device.Type.SWITCH,
                driver != null ? driver.manufacturer() : UNKNOWN,
                driver != null ? driver.hwVersion() : UNKNOWN,
                driver != null ? driver.swVersion() : UNKNOWN,
                UNKNOWN,
                new ChassisId(),
                defaultAvailable,
                DefaultAnnotations.EMPTY);
    }

    static boolean myScheme(DeviceId deviceId) {
        return deviceId.uri().getScheme().equals(URI_SCHEME);
    }

    private boolean isPipelineProgrammable(DeviceId deviceId) {
        return hasBehaviour(deviceId, PiPipelineProgrammable.class);
    }

    private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
                                        DeviceId deviceId, U defaultValue, int timeout) {
        try {
            return future.get(timeout, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("Thread interrupted while {} on {}", opDescription, deviceId);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
        } catch (TimeoutException e) {
            log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
        }
        return defaultValue;
    }
}
