| /* |
| * Copyright 2017-present Open Networking Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.onosproject.provider.general.device.impl; |
| |
| import com.google.common.collect.Maps; |
| import org.onlab.packet.ChassisId; |
| import org.onlab.util.ItemNotFoundException; |
| import org.onlab.util.Tools; |
| import org.onosproject.cfg.ComponentConfigService; |
| import org.onosproject.cluster.ClusterService; |
| import org.onosproject.cluster.NodeId; |
| import org.onosproject.core.CoreService; |
| import org.onosproject.gnmi.api.GnmiController; |
| import org.onosproject.mastership.MastershipInfo; |
| import org.onosproject.mastership.MastershipService; |
| import org.onosproject.net.DefaultAnnotations; |
| import org.onosproject.net.Device; |
| import org.onosproject.net.DeviceId; |
| import org.onosproject.net.MastershipRole; |
| import org.onosproject.net.PortNumber; |
| import org.onosproject.net.behaviour.PiPipelineProgrammable; |
| import org.onosproject.net.behaviour.PortAdmin; |
| import org.onosproject.net.config.NetworkConfigEvent; |
| import org.onosproject.net.config.NetworkConfigListener; |
| import org.onosproject.net.config.NetworkConfigRegistry; |
| import org.onosproject.net.config.basics.BasicDeviceConfig; |
| import org.onosproject.net.device.DefaultDeviceDescription; |
| import org.onosproject.net.device.DeviceAgentEvent; |
| import org.onosproject.net.device.DeviceAgentListener; |
| import org.onosproject.net.device.DeviceDescription; |
| import org.onosproject.net.device.DeviceDescriptionDiscovery; |
| import org.onosproject.net.device.DeviceHandshaker; |
| import org.onosproject.net.device.DeviceProvider; |
| import org.onosproject.net.device.DeviceProviderRegistry; |
| import org.onosproject.net.device.DeviceProviderService; |
| import org.onosproject.net.device.DeviceService; |
| import org.onosproject.net.device.PortDescription; |
| import org.onosproject.net.driver.Behaviour; |
| import org.onosproject.net.driver.DefaultDriverData; |
| import org.onosproject.net.driver.DefaultDriverHandler; |
| import org.onosproject.net.driver.Driver; |
| import org.onosproject.net.driver.DriverData; |
| import org.onosproject.net.driver.DriverService; |
| import org.onosproject.net.pi.model.PiPipeconf; |
| import org.onosproject.net.pi.model.PiPipeconfId; |
| import org.onosproject.net.pi.service.PiPipeconfService; |
| import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent; |
| import org.onosproject.net.pi.service.PiPipeconfWatchdogListener; |
| import org.onosproject.net.pi.service.PiPipeconfWatchdogService; |
| import org.onosproject.net.provider.AbstractProvider; |
| import org.onosproject.net.provider.ProviderId; |
| import org.onosproject.provider.general.device.impl.DeviceTaskExecutor.DeviceTaskException; |
| import org.osgi.service.component.ComponentContext; |
| import org.osgi.service.component.annotations.Activate; |
| import org.osgi.service.component.annotations.Component; |
| import org.osgi.service.component.annotations.Deactivate; |
| import org.osgi.service.component.annotations.Modified; |
| import org.osgi.service.component.annotations.Reference; |
| import org.osgi.service.component.annotations.ReferenceCardinality; |
| import org.slf4j.Logger; |
| |
| import java.util.Collections; |
| import java.util.Dictionary; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Strings.isNullOrEmpty; |
| import static java.lang.System.currentTimeMillis; |
| import static java.util.concurrent.Executors.newFixedThreadPool; |
| import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; |
| import static org.onlab.util.Tools.groupedThreads; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.OP_TIMEOUT_SHORT; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.OP_TIMEOUT_SHORT_DEFAULT; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL_DEFAULT; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL; |
| import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL_DEFAULT; |
| import static org.slf4j.LoggerFactory.getLogger; |
| |
| /** |
| * Provider which uses drivers to discover devices, perform initial handshake, |
| * and notify the core of disconnection events. The implementation listens for |
| * events from netcfg or the drivers (via {@link DeviceAgentListener}) andP |
| * schedules task for each event. |
| */ |
| @Component(immediate = true, |
| property = { |
| PROBE_INTERVAL + ":Integer=" + PROBE_INTERVAL_DEFAULT, |
| STATS_POLL_INTERVAL + ":Integer=" + STATS_POLL_INTERVAL_DEFAULT, |
| OP_TIMEOUT_SHORT + ":Integer=" + OP_TIMEOUT_SHORT_DEFAULT, |
| }) |
| public class GeneralDeviceProvider extends AbstractProvider |
| implements DeviceProvider { |
| |
| private final Logger log = getLogger(getClass()); |
| |
| private static final String APP_NAME = "org.onosproject.generaldeviceprovider"; |
| private static final String URI_SCHEME = "device"; |
| private static final String DEVICE_PROVIDER_PACKAGE = |
| "org.onosproject.general.provider.device"; |
| private static final int CORE_POOL_SIZE = 10; |
| private static final String UNKNOWN = "unknown"; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private DeviceProviderRegistry providerRegistry; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private ComponentConfigService componentConfigService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private NetworkConfigRegistry cfgService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private CoreService coreService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private DeviceService deviceService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private DriverService driverService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private MastershipService mastershipService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private ClusterService clusterService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private PiPipeconfService pipeconfService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private PiPipeconfWatchdogService pipeconfWatchdogService; |
| |
| // FIXME: no longer general if we add a dependency to a protocol-specific |
| // service. Possible solutions are: rename this provider to |
| // StratumDeviceProvider, find a way to allow this provider to register for |
| // protocol specific events (e.g. port events) via drivers (similar to |
| // DeviceAgentListener). |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private GnmiController gnmiController; |
| |
| private GnmiDeviceStateSubscriber gnmiDeviceStateSubscriber; |
| |
| /** Configure interval for checking device availability; default is 10 sec. */ |
| private int probeInterval = PROBE_INTERVAL_DEFAULT; |
| |
| /** Configure poll frequency for port status and stats; default is 10 sec. */ |
| private int statsPollInterval = STATS_POLL_INTERVAL_DEFAULT; |
| |
| /** Configure timeout in seconds for device operations; default is 10 sec. */ |
| private int opTimeoutShort = OP_TIMEOUT_SHORT_DEFAULT; |
| |
| private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap(); |
| private final Map<DeviceId, Long> lastProbedAvailability = Maps.newConcurrentMap(); |
| private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener(); |
| private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener(); |
| private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener(); |
| |
| private ExecutorService mainExecutor; |
| private DeviceTaskExecutor<TaskType> taskExecutor; |
| private ScheduledExecutorService probeExecutor; |
| private ScheduledFuture<?> probeTask; |
| private StatsPoller statsPoller; |
| private DeviceProviderService providerService; |
| |
| public GeneralDeviceProvider() { |
| super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE)); |
| } |
| |
| protected DeviceProviderService providerService() { |
| return providerService; |
| } |
| |
| @Activate |
| public void activate(ComponentContext context) { |
| mainExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads( |
| "onos/gdp-task", "%d", log)); |
| taskExecutor = new DeviceTaskExecutor<>(mainExecutor); |
| probeExecutor = newSingleThreadScheduledExecutor(groupedThreads( |
| "onos/gdp-probe", "%d", log)); |
| providerService = providerRegistry.register(this); |
| componentConfigService.registerProperties(getClass()); |
| coreService.registerApplication(APP_NAME); |
| cfgService.addListener(cfgListener); |
| pipeconfWatchdogService.addListener(pipeconfWatchdogListener); |
| gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber( |
| gnmiController, deviceService, mastershipService, providerService); |
| gnmiDeviceStateSubscriber.activate(); |
| startOrRescheduleProbeTask(); |
| statsPoller = new StatsPoller(deviceService, mastershipService, providerService); |
| statsPoller.activate(statsPollInterval); |
| modified(context); |
| log.info("Started"); |
| } |
| |
| @Modified |
| public void modified(ComponentContext context) { |
| if (context == null) { |
| return; |
| } |
| |
| Dictionary<?, ?> properties = context.getProperties(); |
| final int oldProbeFrequency = probeInterval; |
| probeInterval = Tools.getIntegerProperty( |
| properties, PROBE_INTERVAL, PROBE_INTERVAL_DEFAULT); |
| log.info("Configured. {} is configured to {} seconds", |
| PROBE_INTERVAL, probeInterval); |
| final int oldStatsPollFrequency = statsPollInterval; |
| statsPollInterval = Tools.getIntegerProperty( |
| properties, STATS_POLL_INTERVAL, STATS_POLL_INTERVAL_DEFAULT); |
| log.info("Configured. {} is configured to {} seconds", |
| STATS_POLL_INTERVAL, statsPollInterval); |
| opTimeoutShort = Tools.getIntegerProperty( |
| properties, OP_TIMEOUT_SHORT, OP_TIMEOUT_SHORT_DEFAULT); |
| log.info("Configured. {} is configured to {} seconds", |
| OP_TIMEOUT_SHORT, opTimeoutShort); |
| |
| if (oldProbeFrequency != probeInterval) { |
| startOrRescheduleProbeTask(); |
| } |
| |
| if (oldStatsPollFrequency != statsPollInterval) { |
| statsPoller.reschedule(statsPollInterval); |
| } |
| } |
| |
| @Deactivate |
| public void deactivate() { |
| // Shutdown stats poller. |
| statsPoller.deactivate(); |
| statsPoller = null; |
| // Shutdown probe executor. |
| probeTask.cancel(true); |
| probeTask = null; |
| probeExecutor.shutdownNow(); |
| try { |
| probeExecutor.awaitTermination(5, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| log.warn("probeExecutor not terminated properly"); |
| } |
| probeExecutor = null; |
| // Shutdown main and task executor. |
| taskExecutor.cancel(); |
| taskExecutor = null; |
| mainExecutor.shutdownNow(); |
| try { |
| mainExecutor.awaitTermination(5, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| log.warn("connectionExecutor not terminated properly"); |
| } |
| mainExecutor = null; |
| // Remove all device agent listeners |
| handshakersWithListeners.values().forEach(h -> h.removeDeviceAgentListener(id())); |
| handshakersWithListeners.clear(); |
| // Other cleanup. |
| lastProbedAvailability.clear(); |
| componentConfigService.unregisterProperties(getClass(), false); |
| cfgService.removeListener(cfgListener); |
| pipeconfWatchdogService.removeListener(pipeconfWatchdogListener); |
| providerRegistry.unregister(this); |
| providerService = null; |
| gnmiDeviceStateSubscriber.deactivate(); |
| gnmiDeviceStateSubscriber = null; |
| log.info("Stopped"); |
| } |
| |
| @Override |
| public void triggerProbe(DeviceId deviceId) { |
| checkNotNull(deviceId); |
| submitTask(deviceId, TaskType.PROBE_AVAILABILITY); |
| } |
| |
| @Override |
| public void roleChanged(DeviceId deviceId, MastershipRole newRole) { |
| |
| final MastershipInfo mastershipInfo = mastershipService.getMastershipFor(deviceId); |
| final NodeId localNodeId = clusterService.getLocalNode().id(); |
| |
| if (!mastershipInfo.getRole(localNodeId).equals(newRole)) { |
| log.warn("Inconsistent mastership info for {}! Requested {}, but " + |
| "mastership service reports {}, will apply the latter...", |
| deviceId, newRole, mastershipInfo.getRole(localNodeId)); |
| newRole = mastershipInfo.getRole(localNodeId); |
| } |
| |
| // Derive preference value. |
| final int preference; |
| switch (newRole) { |
| case MASTER: |
| preference = 0; |
| break; |
| case STANDBY: |
| preference = mastershipInfo.backups().indexOf(localNodeId) + 1; |
| if (preference == 0) { |
| // Not found in list. |
| log.error("Unable to derive mastership preference for {}, " + |
| "requested role {} but local node ID was " + |
| "not found among list of backup nodes " + |
| "reported by mastership service"); |
| return; |
| } |
| break; |
| case NONE: |
| // No preference for NONE, apply as is. |
| log.info("Notifying role {} to {}", newRole, deviceId); |
| roleChanged(deviceId, newRole); |
| return; |
| default: |
| log.error("Unrecognized mastership role {}", newRole); |
| return; |
| } |
| |
| log.info("Notifying role {} (preference {}) for term {} to {}", |
| newRole, preference, mastershipInfo.term(), deviceId); |
| |
| final DeviceHandshaker handshaker = getBehaviour( |
| deviceId, DeviceHandshaker.class); |
| if (handshaker == null) { |
| log.error("Null handshaker. Unable to notify role {} to {}", |
| newRole, deviceId); |
| return; |
| } |
| |
| try { |
| handshaker.roleChanged(preference, mastershipInfo.term()); |
| } catch (UnsupportedOperationException e) { |
| // Preference-based method not supported. |
| handshaker.roleChanged(newRole); |
| } |
| } |
| |
| @Override |
| public boolean isReachable(DeviceId deviceId) { |
| final DeviceHandshaker handshaker = getBehaviour( |
| deviceId, DeviceHandshaker.class); |
| if (handshaker == null) { |
| return false; |
| } |
| return handshaker.isReachable(); |
| } |
| |
| @Override |
| public boolean isAvailable(DeviceId deviceId) { |
| final DeviceHandshaker handshaker = getBehaviour( |
| deviceId, DeviceHandshaker.class); |
| if (handshaker == null) { |
| return false; |
| } |
| try { |
| // Try without probing the device... |
| return handshaker.isAvailable(); |
| } catch (UnsupportedOperationException e) { |
| // Driver does not support that. |
| return probeAvailability(handshaker); |
| } |
| } |
| |
| @Override |
| public void changePortState(DeviceId deviceId, PortNumber portNumber, |
| boolean enable) { |
| if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) { |
| log.warn("Missing PortAdmin behaviour on {}, aborting port state change", |
| deviceId); |
| return; |
| } |
| final PortAdmin portAdmin = deviceService.getDevice(deviceId) |
| .as(PortAdmin.class); |
| final CompletableFuture<Boolean> modifyTask = enable |
| ? portAdmin.enable(portNumber) |
| : portAdmin.disable(portNumber); |
| final String descr = (enable ? "enabling" : "disabling") + " port " + portNumber; |
| getFutureWithDeadline( |
| modifyTask, descr, deviceId, null, opTimeoutShort); |
| } |
| |
| @Override |
| public void triggerDisconnect(DeviceId deviceId) { |
| checkNotNull(deviceId); |
| log.info("Triggering disconnection of device {}", deviceId); |
| submitTask(deviceId, TaskType.CONNECTION_TEARDOWN); |
| } |
| |
| /** |
| * Listener for configuration events. |
| */ |
| private class InternalNetworkConfigListener implements NetworkConfigListener { |
| @Override |
| public void event(NetworkConfigEvent event) { |
| DeviceId deviceId = (DeviceId) event.subject(); |
| switch (event.type()) { |
| case CONFIG_ADDED: |
| if (configIsComplete(deviceId)) { |
| submitTask(deviceId, TaskType.CONNECTION_SETUP); |
| } |
| break; |
| case CONFIG_UPDATED: |
| if (configIsComplete(deviceId) && mgmtAddrUpdated(event)) { |
| submitTask(deviceId, TaskType.CONNECTION_UPDATE); |
| } |
| break; |
| case CONFIG_REMOVED: |
| if (event.configClass().equals(BasicDeviceConfig.class)) { |
| submitTask(deviceId, TaskType.CONNECTION_TEARDOWN); |
| } |
| break; |
| default: |
| // Ignore |
| break; |
| } |
| } |
| |
| private boolean mgmtAddrUpdated(NetworkConfigEvent event) { |
| if (!event.prevConfig().isPresent() || !event.config().isPresent()) { |
| return false; |
| } |
| final BasicDeviceConfig prev = (BasicDeviceConfig) event.prevConfig().get(); |
| final BasicDeviceConfig current = (BasicDeviceConfig) event.config().get(); |
| return !Objects.equals(prev.managementAddress(), current.managementAddress()); |
| } |
| |
| @Override |
| public boolean isRelevant(NetworkConfigEvent event) { |
| return event.configClass().equals(BasicDeviceConfig.class) && |
| (event.subject() instanceof DeviceId) && |
| myScheme((DeviceId) event.subject()); |
| } |
| } |
| |
| /** |
| * Listener for device agent events. |
| */ |
| private class InternalDeviceAgentListener implements DeviceAgentListener { |
| @Override |
| public void event(DeviceAgentEvent event) { |
| DeviceId deviceId = event.subject(); |
| switch (event.type()) { |
| case CHANNEL_OPEN: |
| submitTask(deviceId, TaskType.CHANNEL_OPEN); |
| break; |
| case CHANNEL_CLOSED: |
| case CHANNEL_ERROR: |
| submitTask(deviceId, TaskType.CHANNEL_CLOSED); |
| break; |
| case ROLE_MASTER: |
| submitTask(deviceId, TaskType.ROLE_MASTER); |
| break; |
| case ROLE_STANDBY: |
| submitTask(deviceId, TaskType.ROLE_STANDBY); |
| break; |
| case ROLE_NONE: |
| submitTask(deviceId, TaskType.ROLE_NONE); |
| break; |
| case NOT_MASTER: |
| submitTask(deviceId, TaskType.NOT_MASTER); |
| break; |
| default: |
| log.warn("Unrecognized device agent event {}", event.type()); |
| } |
| } |
| } |
| |
| /** |
| * Pipeline event listener. |
| */ |
| private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener { |
| @Override |
| public void event(PiPipeconfWatchdogEvent event) { |
| final DeviceId deviceId = event.subject(); |
| switch (event.type()) { |
| case PIPELINE_READY: |
| submitTask(deviceId, TaskType.PIPELINE_READY); |
| break; |
| case PIPELINE_UNKNOWN: |
| submitTask(deviceId, TaskType.PIPELINE_NOT_READY); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| @Override |
| public boolean isRelevant(PiPipeconfWatchdogEvent event) { |
| return myScheme(event.subject()); |
| } |
| } |
| |
| private void startOrRescheduleProbeTask() { |
| synchronized (this) { |
| if (probeTask != null) { |
| probeTask.cancel(false); |
| } |
| probeTask = probeExecutor.scheduleAtFixedRate( |
| this::submitProbeTasks, |
| 0, |
| probeInterval, |
| TimeUnit.SECONDS); |
| } |
| } |
| |
| private void submitProbeTasks() { |
| // Async trigger a task for all devices in the cfg. |
| log.debug("Starting probing for all devices"); |
| cfgService.getSubjects(DeviceId.class).stream() |
| .filter(GeneralDeviceProvider::myScheme) |
| .forEach(this::submitProbeTask); |
| } |
| |
| private void submitProbeTask(DeviceId deviceId) { |
| final DeviceHandshaker handshaker = handshakersWithListeners.get(deviceId); |
| |
| if (handshaker == null) { |
| if (configIsComplete(deviceId)) { |
| // Device in config but we have not initiated a connection. |
| // Perhaps we missed the config event? |
| submitTask(deviceId, TaskType.CONNECTION_SETUP); |
| } |
| return; |
| } |
| |
| if (!handshaker.isConnected()) { |
| // Device is in the core, but driver reports there is NOT a |
| // connection to it. Perhaps the netcfg changed and we didn't |
| // pick the event? |
| log.warn("Re-establishing lost connection to {}", deviceId); |
| submitTask(deviceId, TaskType.CONNECTION_TEARDOWN); |
| submitTask(deviceId, TaskType.CONNECTION_SETUP); |
| return; |
| } |
| |
| // On probing offline devices, while we expect them to signal |
| // availability via CHANNEL_OPEN or similar events, periodic probing |
| // might be needed to stimulate some channel activity. We might consider |
| // requiring active probing of closed channels in the protocol layer. |
| |
| final Long lastProbe = lastProbedAvailability.get(deviceId); |
| if (lastProbe != null && |
| (currentTimeMillis() - lastProbe) < (probeInterval * 1000 / 3)) { |
| // This avoids overload of probe tasks which might involve sending |
| // messages over the network. We require a minimum interval of 1/3 |
| // of the configured probeInterval between consecutive probe tasks. |
| if (log.isDebugEnabled()) { |
| log.debug("Dropping probe task for {} as it happened recently", |
| deviceId); |
| } |
| return; |
| } |
| |
| submitTask(deviceId, TaskType.PROBE_AVAILABILITY); |
| } |
| |
| /** |
| * Type of tasks performed by this provider. |
| */ |
| enum TaskType { |
| CONNECTION_SETUP, |
| CONNECTION_UPDATE, |
| CONNECTION_TEARDOWN, |
| PIPELINE_READY, |
| CHANNEL_OPEN, |
| CHANNEL_CLOSED, |
| PIPELINE_NOT_READY, |
| PROBE_AVAILABILITY, |
| ROLE_MASTER, |
| ROLE_NONE, |
| ROLE_STANDBY, |
| NOT_MASTER, |
| } |
| |
| private void submitTask(DeviceId deviceId, TaskType taskType) { |
| taskExecutor.submit(deviceId, taskType, taskRunnable(deviceId, taskType)); |
| } |
| |
| private Runnable taskRunnable(DeviceId deviceId, TaskType taskType) { |
| switch (taskType) { |
| case CONNECTION_SETUP: |
| return () -> handleConnectionSetup(deviceId); |
| case CONNECTION_UPDATE: |
| return () -> handleConnectionUpdate(deviceId); |
| case CONNECTION_TEARDOWN: |
| return () -> handleConnectionTeardown(deviceId); |
| case CHANNEL_OPEN: |
| return () -> handleProbeAvailability(deviceId); |
| case CHANNEL_CLOSED: |
| return () -> markOfflineIfNeeded(deviceId); |
| case PIPELINE_NOT_READY: |
| return () -> markOfflineIfNeeded(deviceId); |
| case PIPELINE_READY: |
| return () -> handleProbeAvailability(deviceId); |
| case PROBE_AVAILABILITY: |
| return () -> handleProbeAvailability(deviceId); |
| case ROLE_MASTER: |
| return () -> handleMastershipResponse(deviceId, MastershipRole.MASTER); |
| case ROLE_STANDBY: |
| return () -> handleMastershipResponse(deviceId, MastershipRole.STANDBY); |
| case ROLE_NONE: |
| return () -> handleMastershipResponse(deviceId, MastershipRole.NONE); |
| case NOT_MASTER: |
| return () -> handleNotMaster(deviceId); |
| default: |
| throw new IllegalArgumentException("Unrecognized task type " + taskType); |
| } |
| } |
| |
| private void handleConnectionSetup(DeviceId deviceId) { |
| assertConfig(deviceId); |
| // Bind pipeconf (if any and if device is capable). |
| bindPipeconfIfRequired(deviceId); |
| // Get handshaker. |
| final DeviceHandshaker handshaker = handshakerOrFail(deviceId); |
| if (handshaker.isConnected() || handshakersWithListeners.containsKey(deviceId)) { |
| throw new DeviceTaskException("connection already exists"); |
| } |
| // Add device agent listener. |
| handshaker.addDeviceAgentListener(id(), deviceAgentListener); |
| handshakersWithListeners.put(deviceId, handshaker); |
| // Start connection via handshaker. |
| final Boolean connectSuccess = getFutureWithDeadline( |
| handshaker.connect(), "initiating connection", |
| deviceId, false, opTimeoutShort); |
| if (!connectSuccess) { |
| // Failed! Remove listeners. |
| handshaker.removeDeviceAgentListener(id()); |
| handshakersWithListeners.remove(deviceId); |
| throw new DeviceTaskException("connection failed"); |
| } |
| createOrUpdateDevice(deviceId, false); |
| final List<PortDescription> ports = getPortDetails(deviceId); |
| providerService.updatePorts(deviceId, ports); |
| // From here we expect a CHANNEL_OPEN event to update availability. |
| } |
| |
| private void handleConnectionUpdate(DeviceId deviceId) { |
| assertConfig(deviceId); |
| final DeviceHandshaker handshaker = handshakerOrFail(deviceId); |
| if (!handshaker.isConnected()) { |
| // If driver reports that a connection still exists, perhaps the |
| // part of the netcfg that changed does not affect the connection. |
| // Otherwise, remove any previous connection state from the old |
| // netcfg and create a new one. |
| log.warn("Detected change of connection endpoints for {}, will " + |
| "tear down existing connection and set up a new one...", |
| deviceId); |
| handleConnectionTeardown(deviceId); |
| handleConnectionSetup(deviceId); |
| } |
| } |
| |
| private void createOrUpdateDevice(DeviceId deviceId, boolean available) { |
| if (deviceService.getDevice(deviceId) != null |
| && deviceService.isAvailable(deviceId) == available) { |
| // Other nodes might have advertised this device before us. |
| return; |
| } |
| assertConfig(deviceId); |
| providerService.deviceConnected(deviceId, getDeviceDescription( |
| deviceId, available)); |
| } |
| |
| private boolean probeAvailability(DeviceHandshaker handshaker) { |
| lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis()); |
| return getFutureWithDeadline( |
| handshaker.probeAvailability(), "probing availability", |
| handshaker.data().deviceId(), false, opTimeoutShort); |
| } |
| |
| private boolean probeReachability(DeviceHandshaker handshaker) { |
| lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis()); |
| return getFutureWithDeadline( |
| handshaker.probeReachability(), "probing reachability", |
| handshaker.data().deviceId(), false, opTimeoutShort); |
| } |
| |
| private void markOfflineIfNeeded(DeviceId deviceId) { |
| assertDeviceRegistered(deviceId); |
| if (deviceService.isAvailable(deviceId)) { |
| providerService.deviceDisconnected(deviceId); |
| } |
| } |
| |
| private void handleProbeAvailability(DeviceId deviceId) { |
| assertDeviceRegistered(deviceId); |
| |
| // Make device has a valid mastership role. |
| final DeviceHandshaker handshaker = handshakerOrFail(deviceId); |
| final MastershipRole deviceRole = handshaker.getRole(); |
| final MastershipRole expectedRole = mastershipService.getLocalRole(deviceId); |
| if (expectedRole == MastershipRole.NONE || expectedRole != deviceRole) { |
| // Device does NOT have a valid role... |
| if (!handshaker.isReachable() && !probeReachability(handshaker)) { |
| // ...but is not reachable. There isn't much we can do. |
| markOfflineIfNeeded(deviceId); |
| return; |
| } |
| // ...and is reachable, re-assert role. |
| roleChanged(deviceId, expectedRole == MastershipRole.NONE |
| ? mastershipService.requestRoleForSync(deviceId) |
| : expectedRole); |
| try { |
| // Wait for role to be notified and reachability state to be |
| // updated. This should be roughly equivalent to one RTT. |
| Thread.sleep(500); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| return; |
| } |
| } |
| |
| // Check and update availability. |
| if (probeAvailability(handshakerOrFail(deviceId))) { |
| // Device ready to do its job. |
| createOrUpdateDevice(deviceId, true); |
| } else { |
| markOfflineIfNeeded(deviceId); |
| if (handshaker.isReachable() && isPipelineProgrammable(deviceId)) { |
| // If reachable, but not available, and pipeline programmable, there |
| // is a high chance it's because the pipeline is not READY |
| // (independently from what the pipeconf watchdog reports, as the |
| // status there might be outdated). Encourage pipeconf watchdog to |
| // perform a pipeline probe ASAP. |
| pipeconfWatchdogService.triggerProbe(deviceId); |
| } |
| } |
| } |
| |
| private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) { |
| assertDeviceRegistered(deviceId); |
| log.debug("Device {} asserted role {}", deviceId, response); |
| providerService.receivedRoleReply(deviceId, response); |
| } |
| |
| private void handleNotMaster(DeviceId deviceId) { |
| assertDeviceRegistered(deviceId); |
| if (mastershipService.isLocalMaster(deviceId)) { |
| log.warn("Device {} notified that this node is not master, " + |
| "relinquishing mastership...", deviceId); |
| mastershipService.relinquishMastership(deviceId); |
| } |
| } |
| |
| private void assertDeviceRegistered(DeviceId deviceId) { |
| if (deviceService.getDevice(deviceId) == null) { |
| throw new DeviceTaskException("device not registered in the core"); |
| } |
| } |
| |
| private void handleConnectionTeardown(DeviceId deviceId) { |
| if (deviceService.getDevice(deviceId) != null |
| && deviceService.isAvailable(deviceId)) { |
| providerService.deviceDisconnected(deviceId); |
| } |
| final DeviceHandshaker handshaker = handshakerOrFail(deviceId); |
| handshaker.removeDeviceAgentListener(id()); |
| handshakersWithListeners.remove(deviceId); |
| handshaker.disconnect(); |
| lastProbedAvailability.remove(deviceId); |
| } |
| |
| private void bindPipeconfIfRequired(DeviceId deviceId) { |
| if (pipeconfService.getPipeconf(deviceId).isPresent() |
| || !isPipelineProgrammable(deviceId)) { |
| // Nothing to do. |
| // Device has already a pipeconf or is not programmable. |
| return; |
| } |
| // Get pipeconf from netcfg or driver (default one). |
| final PiPipelineProgrammable pipelineProg = getBehaviour( |
| deviceId, PiPipelineProgrammable.class); |
| final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg); |
| if (pipeconfId == null) { |
| throw new DeviceTaskException("unable to find pipeconf"); |
| } |
| // Store binding in pipeconf service. |
| pipeconfService.bindToDevice(pipeconfId, deviceId); |
| } |
| |
| private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) { |
| // Places to look for a pipeconf ID (in priority order)): |
| // 1) netcfg |
| // 2) device driver (default one) |
| final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId); |
| if (pipeconfId != null && !pipeconfId.id().isEmpty()) { |
| return pipeconfId; |
| } |
| if (pipelineProg != null |
| && pipelineProg.getDefaultPipeconf().isPresent()) { |
| final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get(); |
| log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId); |
| return defaultPipeconf.id(); |
| } |
| return null; |
| } |
| |
| private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) { |
| BasicDeviceConfig config = cfgService.getConfig( |
| deviceId, BasicDeviceConfig.class); |
| if (config == null) { |
| return null; |
| } |
| return config.pipeconf() != null |
| ? new PiPipeconfId(config.pipeconf()) : null; |
| } |
| |
| private DeviceHandshaker handshakerOrFail(DeviceId deviceId) { |
| final DeviceHandshaker handshaker = getBehaviour( |
| deviceId, DeviceHandshaker.class); |
| if (handshaker == null) { |
| throw new DeviceTaskException("missing handshaker behavior"); |
| } |
| return handshaker; |
| } |
| |
| private boolean configIsComplete(DeviceId deviceId) { |
| final BasicDeviceConfig basicDeviceCfg = cfgService.getConfig( |
| deviceId, BasicDeviceConfig.class); |
| return basicDeviceCfg != null && !isNullOrEmpty(basicDeviceCfg.driver()); |
| } |
| |
| private void assertConfig(DeviceId deviceId) { |
| if (!configIsComplete(deviceId)) { |
| throw new DeviceTaskException("configuration is not complete"); |
| } |
| } |
| |
| private Driver getDriver(DeviceId deviceId) { |
| try { |
| // DriverManager checks first using basic device config. |
| return driverService.getDriver(deviceId); |
| } catch (ItemNotFoundException e) { |
| log.error("Driver not found for {}", deviceId); |
| return null; |
| } |
| } |
| |
| private <T extends Behaviour> T getBehaviour(DeviceId deviceId, Class<T> type) { |
| Driver driver = getDriver(deviceId); |
| if (driver == null) { |
| return null; |
| } |
| if (!driver.hasBehaviour(type)) { |
| return null; |
| } |
| final DriverData data = new DefaultDriverData(driver, deviceId); |
| final DefaultDriverHandler handler = new DefaultDriverHandler(data); |
| return driver.createBehaviour(handler, type); |
| } |
| |
| private boolean hasBehaviour(DeviceId deviceId, Class<? extends Behaviour> type) { |
| Driver driver = getDriver(deviceId); |
| if (driver == null) { |
| return false; |
| } |
| return driver.hasBehaviour(type); |
| } |
| |
| private DeviceDescription getDeviceDescription( |
| DeviceId deviceId, boolean defaultAvailable) { |
| // Get one from driver or forge. |
| final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour( |
| deviceId, DeviceDescriptionDiscovery.class); |
| if (deviceDiscovery == null) { |
| return forgeDeviceDescription(deviceId, defaultAvailable); |
| } |
| |
| final DeviceDescription d = deviceDiscovery.discoverDeviceDetails(); |
| if (d == null) { |
| return forgeDeviceDescription(deviceId, defaultAvailable); |
| } |
| // Enforce defaultAvailable flag over the one obtained from driver. |
| return new DefaultDeviceDescription(d, defaultAvailable, d.annotations()); |
| } |
| |
| private List<PortDescription> getPortDetails(DeviceId deviceId) { |
| final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour( |
| deviceId, DeviceDescriptionDiscovery.class); |
| if (deviceDiscovery != null) { |
| return deviceDiscovery.discoverPortDetails(); |
| } else { |
| return Collections.emptyList(); |
| } |
| } |
| |
| private DeviceDescription forgeDeviceDescription( |
| DeviceId deviceId, boolean defaultAvailable) { |
| // Uses handshaker and provider config to get driver data. |
| final DeviceHandshaker handshaker = getBehaviour( |
| deviceId, DeviceHandshaker.class); |
| final Driver driver = handshaker != null |
| ? handshaker.handler().driver() : null; |
| return new DefaultDeviceDescription( |
| deviceId.uri(), |
| Device.Type.SWITCH, |
| driver != null ? driver.manufacturer() : UNKNOWN, |
| driver != null ? driver.hwVersion() : UNKNOWN, |
| driver != null ? driver.swVersion() : UNKNOWN, |
| UNKNOWN, |
| new ChassisId(), |
| defaultAvailable, |
| DefaultAnnotations.EMPTY); |
| } |
| |
| static boolean myScheme(DeviceId deviceId) { |
| return deviceId.uri().getScheme().equals(URI_SCHEME); |
| } |
| |
| private boolean isPipelineProgrammable(DeviceId deviceId) { |
| return hasBehaviour(deviceId, PiPipelineProgrammable.class); |
| } |
| |
| private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription, |
| DeviceId deviceId, U defaultValue, int timeout) { |
| try { |
| return future.get(timeout, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| log.error("Thread interrupted while {} on {}", opDescription, deviceId); |
| Thread.currentThread().interrupt(); |
| } catch (ExecutionException e) { |
| log.error("Exception while {} on {}", opDescription, deviceId, e.getCause()); |
| } catch (TimeoutException e) { |
| log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId); |
| } |
| return defaultValue; |
| } |
| } |