| /* |
| * Copyright 2017-present Open Networking Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.onosproject.provider.general.device.impl; |
| |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.Futures; |
| import org.onlab.packet.ChassisId; |
| import org.onlab.util.ItemNotFoundException; |
| import org.onlab.util.SharedScheduledExecutors; |
| import org.onlab.util.Tools; |
| import org.onosproject.cfg.ComponentConfigService; |
| import org.onosproject.cluster.ClusterService; |
| import org.onosproject.cluster.NodeId; |
| import org.onosproject.core.CoreService; |
| import org.onosproject.drivers.gnmi.OpenConfigGnmiDeviceDescriptionDiscovery; |
| import org.onosproject.gnmi.api.GnmiController; |
| import org.onosproject.mastership.MastershipInfo; |
| import org.onosproject.mastership.MastershipService; |
| import org.onosproject.net.DefaultAnnotations; |
| import org.onosproject.net.Device; |
| import org.onosproject.net.DeviceId; |
| import org.onosproject.net.MastershipRole; |
| import org.onosproject.net.PortNumber; |
| import org.onosproject.net.behaviour.PiPipelineProgrammable; |
| import org.onosproject.net.behaviour.PortAdmin; |
| import org.onosproject.net.config.NetworkConfigEvent; |
| import org.onosproject.net.config.NetworkConfigListener; |
| import org.onosproject.net.config.NetworkConfigRegistry; |
| import org.onosproject.net.config.basics.BasicDeviceConfig; |
| import org.onosproject.net.device.DefaultDeviceDescription; |
| import org.onosproject.net.device.DeviceAdminService; |
| import org.onosproject.net.device.DeviceAgentEvent; |
| import org.onosproject.net.device.DeviceAgentListener; |
| import org.onosproject.net.device.DeviceDescription; |
| import org.onosproject.net.device.DeviceDescriptionDiscovery; |
| import org.onosproject.net.device.DeviceEvent; |
| import org.onosproject.net.device.DeviceHandshaker; |
| import org.onosproject.net.device.DeviceListener; |
| import org.onosproject.net.device.DeviceProvider; |
| import org.onosproject.net.device.DeviceProviderRegistry; |
| import org.onosproject.net.device.DeviceProviderService; |
| import org.onosproject.net.device.PortDescription; |
| import org.onosproject.net.driver.Behaviour; |
| import org.onosproject.net.driver.DefaultDriverData; |
| import org.onosproject.net.driver.DefaultDriverHandler; |
| import org.onosproject.net.driver.Driver; |
| import org.onosproject.net.driver.DriverData; |
| import org.onosproject.net.driver.DriverService; |
| import org.onosproject.net.pi.model.PiPipeconf; |
| import org.onosproject.net.pi.model.PiPipeconfId; |
| import org.onosproject.net.pi.service.PiPipeconfService; |
| import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent; |
| import org.onosproject.net.pi.service.PiPipeconfWatchdogListener; |
| import org.onosproject.net.pi.service.PiPipeconfWatchdogService; |
| import org.onosproject.net.provider.AbstractProvider; |
| import org.onosproject.net.provider.ProviderId; |
| import org.onosproject.provider.general.device.impl.DeviceTaskExecutor.DeviceTaskException; |
| import org.osgi.service.component.ComponentContext; |
| import org.osgi.service.component.annotations.Activate; |
| import org.osgi.service.component.annotations.Component; |
| import org.osgi.service.component.annotations.Deactivate; |
| import org.osgi.service.component.annotations.Modified; |
| import org.osgi.service.component.annotations.Reference; |
| import org.osgi.service.component.annotations.ReferenceCardinality; |
| import org.slf4j.Logger; |
| |
| import java.util.Collections; |
| import java.util.Dictionary; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Strings.isNullOrEmpty; |
| import static java.lang.String.format; |
| import static java.lang.System.currentTimeMillis; |
| import static java.util.concurrent.Executors.newFixedThreadPool; |
| import static org.onlab.util.Tools.groupedThreads; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.CHECKUP_INTERVAL; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.CHECKUP_INTERVAL_DEFAULT; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.READ_PORT_ID; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.READ_PORT_ID_DEFAULT; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL_DEFAULT; |
| import static org.slf4j.LoggerFactory.getLogger; |
| |
| /** |
| * Provider which uses drivers to discover devices, perform initial handshake, |
| * and notify the core of disconnection events. The implementation listens for |
| * events from netcfg or the drivers (via {@link DeviceAgentListener}) andP |
| * schedules task for each event. |
| */ |
| @Component(immediate = true, |
| property = { |
| CHECKUP_INTERVAL + ":Integer=" + CHECKUP_INTERVAL_DEFAULT, |
| STATS_POLL_INTERVAL + ":Integer=" + STATS_POLL_INTERVAL_DEFAULT, |
| READ_PORT_ID + ":Boolean=" + READ_PORT_ID_DEFAULT, |
| }) |
| public class GeneralDeviceProvider extends AbstractProvider |
| implements DeviceProvider { |
| |
| private final Logger log = getLogger(getClass()); |
| |
| private static final String APP_NAME = "org.onosproject.generaldeviceprovider"; |
| private static final String URI_SCHEME = "device"; |
| private static final String DEVICE_PROVIDER_PACKAGE = |
| "org.onosproject.general.provider.device"; |
| private static final int CORE_POOL_SIZE = 10; |
| private static final String UNKNOWN = "unknown"; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private DeviceProviderRegistry providerRegistry; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private ComponentConfigService componentConfigService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private NetworkConfigRegistry cfgService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private CoreService coreService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private DeviceAdminService deviceService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private DriverService driverService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private MastershipService mastershipService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private ClusterService clusterService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private PiPipeconfService pipeconfService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private PiPipeconfWatchdogService pipeconfWatchdogService; |
| |
| // FIXME: no longer general if we add a dependency to a protocol-specific |
| // service. Possible solutions are: rename this provider to |
| // StratumDeviceProvider, find a way to allow this provider to register for |
| // protocol specific events (e.g. port events) via drivers (similar to |
| // DeviceAgentListener). |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private GnmiController gnmiController; |
| |
| private GnmiDeviceStateSubscriber gnmiDeviceStateSubscriber; |
| |
| /** |
| * Configure interval for checking device availability; default is 10 sec. |
| */ |
| private int checkupInterval = CHECKUP_INTERVAL_DEFAULT; |
| |
| /** |
| * Configure poll frequency for port status and stats; default is 10 sec. |
| */ |
| private int statsPollInterval = STATS_POLL_INTERVAL_DEFAULT; |
| |
| /** |
| * Configure read port-id for gnmi drivers; default is false. |
| */ |
| private boolean readPortId = READ_PORT_ID_DEFAULT; |
| |
| private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap(); |
| private final Map<DeviceId, Long> lastCheckups = Maps.newConcurrentMap(); |
| private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener(); |
| private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener(); |
| private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener(); |
| private final DeviceListener deviceListener = new InternalDeviceListener(); |
| |
| private ExecutorService mainExecutor; |
| private DeviceTaskExecutor<TaskType> taskExecutor; |
| private ScheduledFuture<?> checkupTask; |
| private StatsPoller statsPoller; |
| private DeviceProviderService providerService; |
| |
| public GeneralDeviceProvider() { |
| super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE)); |
| } |
| |
| protected DeviceProviderService providerService() { |
| return providerService; |
| } |
| |
| @Activate |
| public void activate(ComponentContext context) { |
| mainExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads( |
| "onos/gdp", "%d", log)); |
| taskExecutor = new DeviceTaskExecutor<>(mainExecutor); |
| providerService = providerRegistry.register(this); |
| componentConfigService.registerProperties(getClass()); |
| coreService.registerApplication(APP_NAME); |
| cfgService.addListener(cfgListener); |
| deviceService.addListener(deviceListener); |
| pipeconfWatchdogService.addListener(pipeconfWatchdogListener); |
| gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber( |
| gnmiController, deviceService, mastershipService, providerService); |
| gnmiDeviceStateSubscriber.activate(); |
| startOrReschedulePeriodicCheckupTasks(); |
| statsPoller = new StatsPoller(deviceService, mastershipService, providerService); |
| statsPoller.activate(statsPollInterval); |
| modified(context); |
| log.info("Started"); |
| } |
| |
| @Modified |
| public void modified(ComponentContext context) { |
| if (context == null) { |
| return; |
| } |
| |
| Dictionary<?, ?> properties = context.getProperties(); |
| final int oldCheckupInterval = checkupInterval; |
| checkupInterval = Tools.getIntegerProperty( |
| properties, CHECKUP_INTERVAL, CHECKUP_INTERVAL_DEFAULT); |
| log.info("Configured. {} is configured to {} seconds", |
| CHECKUP_INTERVAL, checkupInterval); |
| final int oldStatsPollFrequency = statsPollInterval; |
| statsPollInterval = Tools.getIntegerProperty( |
| properties, STATS_POLL_INTERVAL, STATS_POLL_INTERVAL_DEFAULT); |
| log.info("Configured. {} is configured to {} seconds", |
| STATS_POLL_INTERVAL, statsPollInterval); |
| final boolean oldReaPortId = readPortId; |
| String strReadPortId = Tools.get(properties, READ_PORT_ID); |
| readPortId = Boolean.parseBoolean(strReadPortId); |
| log.info("Configured. {} is configured to {}", |
| READ_PORT_ID, readPortId); |
| |
| if (oldCheckupInterval != checkupInterval) { |
| startOrReschedulePeriodicCheckupTasks(); |
| } |
| |
| if (oldStatsPollFrequency != statsPollInterval) { |
| statsPoller.reschedule(statsPollInterval); |
| } |
| |
| if (oldReaPortId != readPortId) { |
| // FIXME temporary solution will be removed when the |
| // transition to p4rt translation is completed |
| OpenConfigGnmiDeviceDescriptionDiscovery.readPortId = readPortId; |
| } |
| } |
| |
| @Deactivate |
| public void deactivate() { |
| deviceService.removeListener(deviceListener); |
| |
| // Shutdown stats poller. |
| statsPoller.deactivate(); |
| statsPoller = null; |
| // Shutdown periodic checkup task. |
| checkupTask.cancel(false); |
| checkupTask = null; |
| // Shutdown main and task executor. |
| taskExecutor.cancel(); |
| taskExecutor = null; |
| mainExecutor.shutdownNow(); |
| try { |
| mainExecutor.awaitTermination(5, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| log.warn("connectionExecutor not terminated properly"); |
| } |
| mainExecutor = null; |
| // Remove all device agent listeners |
| handshakersWithListeners.values().forEach(h -> h.removeDeviceAgentListener(id())); |
| handshakersWithListeners.clear(); |
| // Other cleanup. |
| lastCheckups.clear(); |
| componentConfigService.unregisterProperties(getClass(), false); |
| cfgService.removeListener(cfgListener); |
| pipeconfWatchdogService.removeListener(pipeconfWatchdogListener); |
| providerRegistry.unregister(this); |
| providerService = null; |
| gnmiDeviceStateSubscriber.deactivate(); |
| gnmiDeviceStateSubscriber = null; |
| log.info("Stopped"); |
| } |
| |
| @Override |
| public void triggerProbe(DeviceId deviceId) { |
| checkNotNull(deviceId); |
| submitTask(deviceId, TaskType.CHECKUP); |
| } |
| |
| @Override |
| public void roleChanged(DeviceId deviceId, MastershipRole newRole) { |
| final MastershipInfo mastershipInfo = mastershipService.getMastershipFor(deviceId); |
| final NodeId localNodeId = clusterService.getLocalNode().id(); |
| |
| if (!mastershipInfo.getRole(localNodeId).equals(newRole)) { |
| log.warn("Inconsistent mastership info for {}! Requested {}, but " + |
| "mastership service reports {}, will apply the latter...", |
| deviceId, newRole, mastershipInfo.getRole(localNodeId)); |
| newRole = mastershipInfo.getRole(localNodeId); |
| } |
| |
| final DeviceHandshaker handshaker = getBehaviour( |
| deviceId, DeviceHandshaker.class); |
| if (handshaker == null) { |
| log.error("Null handshaker. Unable to notify role {} to {}", |
| newRole, deviceId); |
| return; |
| } |
| |
| // Derive preference value. |
| final int preference; |
| switch (newRole) { |
| case MASTER: |
| preference = 0; |
| break; |
| case STANDBY: |
| preference = mastershipInfo.backups().indexOf(localNodeId) + 1; |
| if (preference == 0) { |
| // Not found in list. |
| log.error("Unable to derive mastership preference for {}, " + |
| "requested role {} but local node ID was " + |
| "not found among list of backup nodes " + |
| "reported by mastership service", |
| deviceId, newRole); |
| return; |
| } |
| break; |
| case NONE: |
| // No preference for NONE, apply as is. |
| log.info("Notifying role {} to {}", newRole, deviceId); |
| handshaker.roleChanged(newRole); |
| return; |
| default: |
| log.error("Unrecognized mastership role {}", newRole); |
| return; |
| } |
| |
| log.info("Notifying role {} (preference {}) for term {} to {}", |
| newRole, preference, mastershipInfo.term(), deviceId); |
| |
| try { |
| handshaker.roleChanged(preference, mastershipInfo.term()); |
| } catch (UnsupportedOperationException e) { |
| // Preference-based method not supported. |
| handshaker.roleChanged(newRole); |
| } |
| } |
| |
| @Override |
| public boolean isReachable(DeviceId deviceId) { |
| final DeviceHandshaker handshaker = getBehaviour( |
| deviceId, DeviceHandshaker.class); |
| if (handshaker == null) { |
| return false; |
| } |
| return handshaker.isReachable(); |
| } |
| |
| @Override |
| public boolean isAvailable(DeviceId deviceId) { |
| final DeviceHandshaker handshaker = getBehaviour( |
| deviceId, DeviceHandshaker.class); |
| if (handshaker == null) { |
| return false; |
| } |
| try { |
| // Try without probing the device... |
| return handshaker.isAvailable(); |
| } catch (UnsupportedOperationException e) { |
| // Driver does not support that. |
| return probeAvailability(handshaker); |
| } |
| } |
| |
| @Override |
| public void changePortState(DeviceId deviceId, PortNumber portNumber, |
| boolean enable) { |
| if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) { |
| log.warn("Missing PortAdmin behaviour on {}, aborting port state change", |
| deviceId); |
| return; |
| } |
| final PortAdmin portAdmin = deviceService.getDevice(deviceId) |
| .as(PortAdmin.class); |
| final CompletableFuture<Boolean> modifyTask = enable |
| ? portAdmin.enable(portNumber) |
| : portAdmin.disable(portNumber); |
| final String descr = format("%s port %s on %s", |
| (enable ? "enable" : "disable"), |
| portNumber, deviceId); |
| modifyTask.whenComplete((success, ex) -> { |
| if (ex != null) { |
| log.error("Exception while trying to " + descr, ex); |
| } else if (!success) { |
| log.warn("Unable to " + descr); |
| } |
| }); |
| } |
| |
| @Override |
| public void triggerDisconnect(DeviceId deviceId) { |
| checkNotNull(deviceId); |
| log.info("Triggering disconnection of device {}", deviceId); |
| submitTask(deviceId, TaskType.CONNECTION_TEARDOWN); |
| } |
| |
| /** |
| * Listener for configuration events. |
| */ |
| private class InternalNetworkConfigListener implements NetworkConfigListener { |
| @Override |
| public void event(NetworkConfigEvent event) { |
| DeviceId deviceId = (DeviceId) event.subject(); |
| switch (event.type()) { |
| case CONFIG_ADDED: |
| if (configIsPresent(deviceId)) { |
| submitTask(deviceId, TaskType.CONNECTION_SETUP); |
| } |
| break; |
| case CONFIG_UPDATED: |
| if (configIsPresent(deviceId) && mgmtAddrUpdated(event)) { |
| submitTask(deviceId, TaskType.CONNECTION_UPDATE); |
| } |
| break; |
| case CONFIG_REMOVED: |
| if (event.configClass().equals(BasicDeviceConfig.class)) { |
| submitTask(deviceId, TaskType.CONNECTION_TEARDOWN); |
| } |
| break; |
| default: |
| // Ignore |
| break; |
| } |
| } |
| |
| private boolean mgmtAddrUpdated(NetworkConfigEvent event) { |
| if (!event.prevConfig().isPresent() || !event.config().isPresent()) { |
| return false; |
| } |
| final BasicDeviceConfig prev = (BasicDeviceConfig) event.prevConfig().get(); |
| final BasicDeviceConfig current = (BasicDeviceConfig) event.config().get(); |
| return !Objects.equals(prev.managementAddress(), current.managementAddress()); |
| } |
| |
| @Override |
| public boolean isRelevant(NetworkConfigEvent event) { |
| return event.configClass().equals(BasicDeviceConfig.class) && |
| (event.subject() instanceof DeviceId) && |
| myScheme((DeviceId) event.subject()); |
| } |
| } |
| |
| /** |
| * Listener for device agent events. |
| */ |
| private class InternalDeviceAgentListener implements DeviceAgentListener { |
| @Override |
| public void event(DeviceAgentEvent event) { |
| DeviceId deviceId = event.subject(); |
| switch (event.type()) { |
| case CHANNEL_OPEN: |
| submitTask(deviceId, TaskType.CHANNEL_OPEN); |
| break; |
| case CHANNEL_CLOSED: |
| case CHANNEL_ERROR: |
| submitTask(deviceId, TaskType.CHANNEL_CLOSED); |
| break; |
| case ROLE_MASTER: |
| submitTask(deviceId, TaskType.ROLE_MASTER); |
| break; |
| case ROLE_STANDBY: |
| submitTask(deviceId, TaskType.ROLE_STANDBY); |
| break; |
| case ROLE_NONE: |
| // FIXME: in case of device disconnection, agents will |
| // signal role NONE, preventing the DeviceManager to mark |
| // the device as offline, as only the master can do that. We |
| // should change the DeviceManager. For now, we disable |
| // signaling role NONE. |
| // submitTask(deviceId, TaskType.ROLE_NONE); |
| break; |
| case NOT_MASTER: |
| submitTask(deviceId, TaskType.NOT_MASTER); |
| break; |
| default: |
| log.warn("Unrecognized device agent event {}", event.type()); |
| } |
| } |
| } |
| |
| /** |
| * Pipeline event listener. |
| */ |
| private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener { |
| @Override |
| public void event(PiPipeconfWatchdogEvent event) { |
| final DeviceId deviceId = event.subject(); |
| switch (event.type()) { |
| case PIPELINE_READY: |
| submitTask(deviceId, TaskType.PIPELINE_READY); |
| break; |
| case PIPELINE_UNKNOWN: |
| submitTask(deviceId, TaskType.PIPELINE_NOT_READY); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| @Override |
| public boolean isRelevant(PiPipeconfWatchdogEvent event) { |
| return myScheme(event.subject()); |
| } |
| } |
| |
| private void startOrReschedulePeriodicCheckupTasks() { |
| synchronized (this) { |
| if (checkupTask != null) { |
| checkupTask.cancel(false); |
| } |
| checkupTask = SharedScheduledExecutors.getPoolThreadExecutor() |
| .scheduleAtFixedRate( |
| this::submitCheckupTasksForAllDevices, |
| 1, |
| checkupInterval, |
| TimeUnit.SECONDS, |
| true); |
| } |
| } |
| |
| private void submitCheckupTasksForAllDevices() { |
| // Async trigger a task for all devices in the cfg. |
| log.debug("Submitting checkup task for all devices..."); |
| final Set<DeviceId> deviceToCheck = Sets.newHashSet(); |
| // All devices in the core and in the config that we care about. |
| deviceService.getDevices().forEach(d -> { |
| if (myScheme(d.id())) { |
| deviceToCheck.add(d.id()); |
| } |
| }); |
| cfgService.getSubjects(DeviceId.class).stream() |
| .filter(GeneralDeviceProvider::myScheme) |
| .filter(this::configIsPresent) |
| .forEach(deviceToCheck::add); |
| deviceToCheck.forEach(d -> submitTask(d, TaskType.CHECKUP)); |
| } |
| |
| /** |
| * Type of tasks performed by this provider. |
| */ |
| enum TaskType { |
| CONNECTION_SETUP, |
| CONNECTION_UPDATE, |
| CONNECTION_TEARDOWN, |
| PIPELINE_READY, |
| CHANNEL_OPEN, |
| CHANNEL_CLOSED, |
| PIPELINE_NOT_READY, |
| CHECKUP, |
| ROLE_MASTER, |
| ROLE_NONE, |
| ROLE_STANDBY, |
| NOT_MASTER, |
| } |
| |
| private void submitTask(DeviceId deviceId, TaskType taskType) { |
| taskExecutor.submit(deviceId, taskType, taskRunnable(deviceId, taskType)); |
| } |
| |
| private Runnable taskRunnable(DeviceId deviceId, TaskType taskType) { |
| switch (taskType) { |
| case CONNECTION_SETUP: |
| return () -> handleConnectionSetup(deviceId); |
| case CONNECTION_UPDATE: |
| return () -> handleConnectionUpdate(deviceId); |
| case CONNECTION_TEARDOWN: |
| return () -> handleConnectionTeardown(deviceId); |
| case CHANNEL_OPEN: |
| case CHECKUP: |
| case PIPELINE_READY: |
| return () -> doCheckupAndRepair(deviceId); |
| case CHANNEL_CLOSED: |
| case PIPELINE_NOT_READY: |
| return () -> markOfflineIfNeeded(deviceId); |
| case ROLE_MASTER: |
| return () -> handleMastershipResponse(deviceId, MastershipRole.MASTER); |
| case ROLE_STANDBY: |
| return () -> handleMastershipResponse(deviceId, MastershipRole.STANDBY); |
| case ROLE_NONE: |
| return () -> handleMastershipResponse(deviceId, MastershipRole.NONE); |
| case NOT_MASTER: |
| return () -> handleNotMaster(deviceId); |
| default: |
| throw new IllegalArgumentException("Unrecognized task type " + taskType); |
| } |
| } |
| |
| private void handleConnectionSetup(DeviceId deviceId) { |
| assertConfig(deviceId); |
| // Bind pipeconf (if any and if device is capable). |
| bindPipeconfIfRequired(deviceId); |
| // Get handshaker. |
| final DeviceHandshaker handshaker = handshakerOrFail(deviceId); |
| if (handshaker.hasConnection() || handshakersWithListeners.containsKey(deviceId)) { |
| throw new DeviceTaskException("connection already exists"); |
| } |
| // Add device agent listener. |
| handshakersWithListeners.put(deviceId, handshaker); |
| handshaker.addDeviceAgentListener(id(), deviceAgentListener); |
| // Start connection via handshaker. |
| if (!handshaker.connect()) { |
| // Failed! Remove listeners. |
| handshaker.removeDeviceAgentListener(id()); |
| handshakersWithListeners.remove(deviceId); |
| // Clean up connection state leftovers. |
| handshaker.disconnect(); |
| throw new DeviceTaskException("connection failed"); |
| } |
| createOrUpdateDevice(deviceId, false); |
| // From here we expect a CHANNEL_OPEN event to update availability. |
| } |
| |
| private void handleConnectionUpdate(DeviceId deviceId) { |
| assertConfig(deviceId); |
| final DeviceHandshaker handshaker = handshakerOrFail(deviceId); |
| if (!handshaker.hasConnection()) { |
| // If driver reports that a connection still exists, perhaps the |
| // part of the netcfg that changed does not affect the connection. |
| // Otherwise, remove any previous connection state from the old |
| // netcfg and create a new one. |
| log.warn("Detected change of connection endpoints for {}, will " + |
| "tear down existing connection and set up a new one...", |
| deviceId); |
| handleConnectionTeardown(deviceId); |
| handleConnectionSetup(deviceId); |
| } |
| } |
| |
| private void createOrUpdateDevice(DeviceId deviceId, boolean available) { |
| assertConfig(deviceId); |
| |
| if (available) { |
| // Push port descriptions. If marking online, make sure to update |
| // ports before other subsystems pick up the device event. |
| final List<PortDescription> ports = getPortDetails(deviceId); |
| providerService.updatePorts(deviceId, ports); |
| } |
| |
| DeviceDescription deviceDescription = getDeviceDescription(deviceId, available); |
| DeviceDescription storeDescription = providerService.getDeviceDescription(deviceId); |
| if (deviceService.getDevice(deviceId) != null && |
| deviceService.isAvailable(deviceId) == available && |
| storeDescription != null) { |
| /* |
| * FIXME SDFAB-650 rethink the APIs and abstractions around the DeviceStore. |
| * Device registration is a two-step process for the GDP. Initially, the device is |
| * registered with default avail. to false. Later, the checkup task will update the |
| * description with the default avail to true in order to mark it available. Today, |
| * there is only one API to mark online a device from the device provider which is |
| * deviceConnected which assumes an update on the device description. The device provider |
| * is the only one able to update the device description and we have to make sure that |
| * the default avail. is flipped to true as it is used to mark as online the device when |
| * it is created or updated. Otherwise, if an ONOS instance fails and restarts, when re-joining |
| * the cluster, it will get the device marked as offline and will not be able to update |
| * its status until it become the master. This process concurs with the markOnline done |
| * by the background thread in the DeviceManager and its the reason why we cannot just check |
| * the device availability but we need to compare also the desc. Checking here the equality, |
| * as in general we may want to upgrade the device description at run time. |
| */ |
| DeviceDescription testDeviceDescription = DefaultDeviceDescription.copyReplacingAnnotation( |
| deviceDescription, storeDescription.annotations()); |
| if (testDeviceDescription.equals(storeDescription)) { |
| return; |
| } |
| } |
| |
| providerService.deviceConnected(deviceId, deviceDescription); |
| } |
| |
| private boolean probeAvailability(DeviceHandshaker handshaker) { |
| return Futures.getUnchecked(handshaker.probeAvailability()); |
| } |
| |
| private void markOfflineIfNeeded(DeviceId deviceId) { |
| assertDeviceRegistered(deviceId); |
| if (deviceService.isAvailable(deviceId)) { |
| providerService.deviceDisconnected(deviceId); |
| } |
| } |
| |
| private void doCheckupAndRepair(DeviceId deviceId) { |
| |
| // This task should be invoked periodically for each device known by |
| // this provider, or as a consequence of events signaling potential |
| // availability changes of the device. We check that everything is in |
| // order, repair what's wrong, and eventually mark the the device as |
| // available (or not) in the core. |
| |
| if (!configIsPresent(deviceId)) { |
| // We should have a connection only for devices in the config. |
| submitTask(deviceId, TaskType.CONNECTION_TEARDOWN); |
| return; |
| } |
| |
| final DeviceHandshaker handshaker = handshakersWithListeners.get(deviceId); |
| if (handshaker == null) { |
| // Device in config but we have not initiated a connection. |
| // Perhaps we missed the config event? |
| submitTask(deviceId, TaskType.CONNECTION_SETUP); |
| return; |
| } |
| |
| // If here, we have a handshaker meaning we already connected once to |
| // the device... |
| if (!handshaker.hasConnection()) { |
| // ... but now the driver reports there is NOT a connection. |
| // Perhaps the netcfg changed and we didn't pick the event? |
| log.warn("Re-establishing lost connection to {}", deviceId); |
| submitTask(deviceId, TaskType.CONNECTION_TEARDOWN); |
| submitTask(deviceId, TaskType.CONNECTION_SETUP); |
| return; |
| } |
| |
| // If here, device should be registered in the core. |
| assertDeviceRegistered(deviceId); |
| |
| if (!handshaker.isReachable()) { |
| // Device appears to be offline. |
| markOfflineIfNeeded(deviceId); |
| // While we expect the protocol layer to implement some sort of |
| // connection backoff mechanism and to signal availability via |
| // CHANNEL_OPEN events, we stimulate some channel activity now. |
| // Trigger probe over the network and forget about it (not waiting |
| // for future to complete). If channel is ready, we expect to come |
| // back here via a CHANNEL_OPEN event. |
| handshaker.probeReachability(); |
| return; |
| } |
| |
| // If here, device is reachable. Now do mastership and availability |
| // checkups. To avoid overload of checkup tasks which might involve |
| // sending messages over the network and triggering mastership |
| // elections. We require a minimum interval of 1/3 of the configured |
| // checkupInterval between consecutive checkup tasks when the device is |
| // known to be available. |
| |
| final Long lastCheckup = lastCheckups.get(deviceId); |
| final boolean isAvailable = deviceService.isAvailable(deviceId); |
| if (isAvailable && lastCheckup != null && |
| (currentTimeMillis() - lastCheckup) < (checkupInterval * 1000 / 3)) { |
| if (log.isDebugEnabled()) { |
| log.debug("Dropping checkup task for {} as it happened recently", |
| deviceId); |
| } |
| return; |
| } |
| lastCheckups.put(deviceId, currentTimeMillis()); |
| |
| // Make sure device has a valid mastership role. |
| final MastershipRole expectedRole = mastershipService.getLocalRole(deviceId); |
| if (expectedRole == MastershipRole.NONE) { |
| log.debug("Detected invalid role ({}) for {}, waiting for mastership " + |
| "service to fix this...", |
| expectedRole, deviceId); |
| // Gentle nudge to fix things... |
| mastershipService.requestRoleForSync(deviceId); |
| return; |
| } |
| |
| final MastershipRole deviceRole = handshaker.getRole(); |
| if (expectedRole != deviceRole) { |
| // FIXME: we should be checking the mastership term as well. |
| log.debug("Detected role mismatch for {}, core expects {}, " + |
| "but device reports {}, waiting for mastership " + |
| "service to fix this...", |
| deviceId, expectedRole, deviceRole); |
| // Gentle nudge to fix things... |
| providerService.receivedRoleReply(deviceId, deviceRole); |
| return; |
| } |
| |
| // Check and update availability, which differently from reachability |
| // describes the ability of the device to forward packets. |
| if (probeAvailability(handshaker)) { |
| // Device ready to do its job. |
| createOrUpdateDevice(deviceId, true); |
| } else { |
| markOfflineIfNeeded(deviceId); |
| if (isPipelineProgrammable(deviceId)) { |
| // If reachable, but not available, and pipeline programmable, |
| // there is a high chance it's because the pipeline is not READY |
| // (independently from what the pipeconf watchdog reports, as |
| // the status there might be outdated). Encourage pipeconf |
| // watchdog to perform a pipeline probe ASAP. |
| pipeconfWatchdogService.triggerProbe(deviceId); |
| } |
| } |
| } |
| |
| private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) { |
| assertDeviceRegistered(deviceId); |
| log.debug("Device {} asserted role {}", deviceId, response); |
| providerService.receivedRoleReply(deviceId, response); |
| } |
| |
| private void handleNotMaster(DeviceId deviceId) { |
| assertDeviceRegistered(deviceId); |
| handleMastershipResponse(deviceId, handshakerOrFail(deviceId).getRole()); |
| } |
| |
| private void assertDeviceRegistered(DeviceId deviceId) { |
| if (!deviceIsRegistered(deviceId)) { |
| throw new DeviceTaskException("device not registered in the core"); |
| } |
| } |
| |
| private boolean deviceIsRegistered(DeviceId deviceId) { |
| return deviceService.getDevice(deviceId) != null; |
| } |
| |
| private void handleConnectionTeardown(DeviceId deviceId) { |
| if (deviceService.getDevice(deviceId) != null |
| && deviceService.isAvailable(deviceId)) { |
| providerService.deviceDisconnected(deviceId); |
| } |
| final DeviceHandshaker handshaker = handshakerOrFail(deviceId); |
| handshaker.removeDeviceAgentListener(id()); |
| handshakersWithListeners.remove(deviceId); |
| handshaker.disconnect(); |
| lastCheckups.remove(deviceId); |
| } |
| |
| private void bindPipeconfIfRequired(DeviceId deviceId) { |
| if (pipeconfService.getPipeconf(deviceId).isPresent() |
| || !isPipelineProgrammable(deviceId)) { |
| // Nothing to do. |
| // Device has already a pipeconf or is not programmable. |
| return; |
| } |
| // Get pipeconf from netcfg or driver (default one). |
| final PiPipelineProgrammable pipelineProg = getBehaviour( |
| deviceId, PiPipelineProgrammable.class); |
| final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg); |
| if (pipeconfId == null) { |
| throw new DeviceTaskException("unable to find pipeconf"); |
| } |
| if (!pipeconfService.getPipeconf(pipeconfId).isPresent()) { |
| throw new DeviceTaskException(format( |
| "pipeconf %s not registered", pipeconfId)); |
| } |
| // Store binding in pipeconf service. |
| pipeconfService.bindToDevice(pipeconfId, deviceId); |
| } |
| |
| private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) { |
| // Places to look for a pipeconf ID (in priority order)): |
| // 1) netcfg |
| // 2) device driver (default one) |
| final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId); |
| if (pipeconfId != null && !pipeconfId.id().isEmpty()) { |
| return pipeconfId; |
| } |
| if (pipelineProg != null |
| && pipelineProg.getDefaultPipeconf().isPresent()) { |
| final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get(); |
| log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId); |
| return defaultPipeconf.id(); |
| } |
| return null; |
| } |
| |
| private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) { |
| BasicDeviceConfig config = cfgService.getConfig( |
| deviceId, BasicDeviceConfig.class); |
| if (config == null) { |
| return null; |
| } |
| return config.pipeconf() != null |
| ? new PiPipeconfId(config.pipeconf()) : null; |
| } |
| |
| private DeviceHandshaker handshakerOrFail(DeviceId deviceId) { |
| final DeviceHandshaker handshaker = getBehaviour( |
| deviceId, DeviceHandshaker.class); |
| if (handshaker == null) { |
| throw new DeviceTaskException("missing handshaker behavior"); |
| } |
| return handshaker; |
| } |
| |
| private boolean configIsPresent(DeviceId deviceId) { |
| final BasicDeviceConfig basicDeviceCfg = cfgService.getConfig( |
| deviceId, BasicDeviceConfig.class); |
| return basicDeviceCfg != null && !isNullOrEmpty(basicDeviceCfg.driver()); |
| } |
| |
| private void assertConfig(DeviceId deviceId) { |
| if (!configIsPresent(deviceId)) { |
| throw new DeviceTaskException("configuration is not complete"); |
| } |
| } |
| |
| private Driver getDriver(DeviceId deviceId) { |
| try { |
| // DriverManager checks first using basic device config. |
| return driverService.getDriver(deviceId); |
| } catch (ItemNotFoundException e) { |
| log.error("Driver not found for {}", deviceId); |
| return null; |
| } |
| } |
| |
| private <T extends Behaviour> T getBehaviour(DeviceId deviceId, Class<T> type) { |
| Driver driver = getDriver(deviceId); |
| if (driver == null) { |
| return null; |
| } |
| if (!driver.hasBehaviour(type)) { |
| return null; |
| } |
| final DriverData data = new DefaultDriverData(driver, deviceId); |
| final DefaultDriverHandler handler = new DefaultDriverHandler(data); |
| return driver.createBehaviour(handler, type); |
| } |
| |
| private boolean hasBehaviour(DeviceId deviceId, Class<? extends Behaviour> type) { |
| Driver driver = getDriver(deviceId); |
| if (driver == null) { |
| return false; |
| } |
| return driver.hasBehaviour(type); |
| } |
| |
| private DeviceDescription getDeviceDescription( |
| DeviceId deviceId, boolean defaultAvailable) { |
| // Get one from driver or forge. |
| final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour( |
| deviceId, DeviceDescriptionDiscovery.class); |
| if (deviceDiscovery == null) { |
| return forgeDeviceDescription(deviceId, defaultAvailable); |
| } |
| |
| final DeviceDescription d = deviceDiscovery.discoverDeviceDetails(); |
| if (d == null) { |
| return forgeDeviceDescription(deviceId, defaultAvailable); |
| } |
| // Enforce defaultAvailable flag over the one obtained from driver. |
| return new DefaultDeviceDescription(d, defaultAvailable, d.annotations()); |
| } |
| |
| private List<PortDescription> getPortDetails(DeviceId deviceId) { |
| final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour( |
| deviceId, DeviceDescriptionDiscovery.class); |
| if (deviceDiscovery != null) { |
| return deviceDiscovery.discoverPortDetails(); |
| } else { |
| return Collections.emptyList(); |
| } |
| } |
| |
| private DeviceDescription forgeDeviceDescription( |
| DeviceId deviceId, boolean defaultAvailable) { |
| // Uses handshaker and provider config to get driver data. |
| final DeviceHandshaker handshaker = getBehaviour( |
| deviceId, DeviceHandshaker.class); |
| final Driver driver = handshaker != null |
| ? handshaker.handler().driver() : null; |
| return new DefaultDeviceDescription( |
| deviceId.uri(), |
| Device.Type.SWITCH, |
| driver != null ? driver.manufacturer() : UNKNOWN, |
| driver != null ? driver.hwVersion() : UNKNOWN, |
| driver != null ? driver.swVersion() : UNKNOWN, |
| UNKNOWN, |
| new ChassisId(), |
| defaultAvailable, |
| DefaultAnnotations.EMPTY); |
| } |
| |
| static boolean myScheme(DeviceId deviceId) { |
| return deviceId.uri().getScheme().equals(URI_SCHEME); |
| } |
| |
| private boolean isPipelineProgrammable(DeviceId deviceId) { |
| return hasBehaviour(deviceId, PiPipelineProgrammable.class); |
| } |
| |
| private class InternalDeviceListener implements DeviceListener { |
| @Override |
| public void event(DeviceEvent event) { |
| log.info("Triggering disconnect for device {}", event.subject().id()); |
| triggerDisconnect(event.subject().id()); |
| } |
| |
| @Override |
| public boolean isRelevant(DeviceEvent event) { |
| return DeviceEvent.Type.DEVICE_REMOVED == event.type() && myScheme(event.subject().id()); |
| } |
| } |
| } |