Further improvements to connection handling for gRPC-based devices
Force reset of gRPC connection backoff when probing for reachability.
This allows provider to attempt reconnection when needed, instead of
depending on the channel backoff timer.
Improved checkup task in GDP to better handle mastership
flapping observed when reconnecting devices.
Change-Id: I473fb14160b2eb744a483de431b91f9f6bcdab95
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
index 5b3153c..f7c33c3 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -101,7 +101,7 @@
"A %s client already exists for %s", serviceName, deviceId));
}
- log.info("Creating {}...", clientName(deviceId));
+ log.debug("Creating {}...", clientName(deviceId));
final C client;
try {
@@ -135,7 +135,7 @@
withDeviceLock(() -> {
final C client = clients.remove(deviceId);
if (client != null) {
- log.info("Removing {}...", clientName(deviceId));
+ log.debug("Removing {}...", clientName(deviceId));
client.shutdown();
}
return null;
diff --git a/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/AbstractGrpcHandshaker.java b/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/AbstractGrpcHandshaker.java
index 0183133..4cfb63b 100644
--- a/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/AbstractGrpcHandshaker.java
+++ b/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/AbstractGrpcHandshaker.java
@@ -17,6 +17,7 @@
package org.onosproject.grpc.utils;
import com.google.common.util.concurrent.Striped;
+import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import org.onosproject.grpc.api.GrpcChannelController;
import org.onosproject.grpc.api.GrpcClient;
@@ -159,6 +160,7 @@
if (!setupBehaviour("probeReachability()")) {
return completedFuture(false);
}
+ resetChannelConnectBackoffIfNeeded();
return client.probeService();
}
@@ -175,4 +177,26 @@
handler().get(controllerClass)
.removeDeviceAgentListener(data().deviceId(), providerId);
}
+
+ private void resetChannelConnectBackoffIfNeeded() {
+ // Stimulate channel reconnect if in failure state.
+ final ManagedChannel channel = getExistingChannel();
+ if (channel == null) {
+ // Where did the channel go?
+ return;
+ }
+ if (channel.getState(false)
+ .equals(ConnectivityState.TRANSIENT_FAILURE)) {
+ channel.resetConnectBackoff();
+ }
+ }
+
+ private ManagedChannel getExistingChannel() {
+ final DeviceId deviceId = data().deviceId();
+ if (CHANNEL_URIS.containsKey(deviceId)) {
+ return handler().get(GrpcChannelController.class)
+ .get(CHANNEL_URIS.get(deviceId)).orElse(null);
+ }
+ return null;
+ }
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
index cf3cda0..b2516b4 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -187,7 +187,12 @@
deviceId, requestedToBeMaster.get(),
pendingElectionId, masterElectionId,
streamChannelManager.isOpen());
+ // Optimistically set the reported master status, if wrong, it
+ // will be updated by the arbitration response. This alleviates
+ // race conditions when calling isMaster() right after setting
+ // mastership.
sendMasterArbitrationUpdate(pendingElectionId);
+ isMaster.set(requestedToBeMaster.get());
pendingElectionId = null;
pendingElectionIdTimestamp = 0;
// No need to listen for master election ID changes.
@@ -199,7 +204,9 @@
@Override
public boolean isMaster(long p4DeviceId) {
checkArgument(this.p4DeviceId == p4DeviceId);
- return isMaster.get();
+ synchronized (requestedToBeMaster) {
+ return isMaster.get();
+ }
}
@Override
@@ -397,16 +404,11 @@
void signalClosed() {
synchronized (this) {
final boolean wasOpen = open.getAndSet(false);
- // FIXME: in case of device disconnection, all clients will
- // signal role NONE, preventing the DeviceManager to mark the
- // device as offline, as only the master can do that. We should
- // change the DeviceManager. For now, we disable signaling role
- // NONE.
- // if (wasOpen) {
- // // We lost any valid mastership role.
- // controller.postEvent(new DeviceAgentEvent(
- // DeviceAgentEvent.Type.ROLE_NONE, deviceId));
- // }
+ if (wasOpen) {
+ // We lost any valid mastership role.
+ controller.postEvent(new DeviceAgentEvent(
+ DeviceAgentEvent.Type.ROLE_NONE, deviceId));
+ }
}
}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index e716f56..f983ac6 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -20,6 +20,7 @@
import com.google.common.util.concurrent.Futures;
import org.onlab.packet.ChassisId;
import org.onlab.util.ItemNotFoundException;
+import org.onlab.util.SharedScheduledExecutors;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
@@ -81,7 +82,6 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -90,10 +90,9 @@
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.Executors.newFixedThreadPool;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL_DEFAULT;
+import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.CHECKUP_INTERVAL;
+import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.CHECKUP_INTERVAL_DEFAULT;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL_DEFAULT;
import static org.slf4j.LoggerFactory.getLogger;
@@ -106,7 +105,7 @@
*/
@Component(immediate = true,
property = {
- PROBE_INTERVAL + ":Integer=" + PROBE_INTERVAL_DEFAULT,
+ CHECKUP_INTERVAL + ":Integer=" + CHECKUP_INTERVAL_DEFAULT,
STATS_POLL_INTERVAL + ":Integer=" + STATS_POLL_INTERVAL_DEFAULT,
})
public class GeneralDeviceProvider extends AbstractProvider
@@ -164,7 +163,7 @@
/**
* Configure interval for checking device availability; default is 10 sec.
*/
- private int probeInterval = PROBE_INTERVAL_DEFAULT;
+ private int checkupInterval = CHECKUP_INTERVAL_DEFAULT;
/**
* Configure poll frequency for port status and stats; default is 10 sec.
@@ -172,15 +171,14 @@
private int statsPollInterval = STATS_POLL_INTERVAL_DEFAULT;
private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap();
- private final Map<DeviceId, Long> lastProbedAvailability = Maps.newConcurrentMap();
+ private final Map<DeviceId, Long> lastCheckups = Maps.newConcurrentMap();
private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener();
private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
private ExecutorService mainExecutor;
private DeviceTaskExecutor<TaskType> taskExecutor;
- private ScheduledExecutorService probeExecutor;
- private ScheduledFuture<?> probeTask;
+ private ScheduledFuture<?> checkupTask;
private StatsPoller statsPoller;
private DeviceProviderService providerService;
@@ -195,10 +193,8 @@
@Activate
public void activate(ComponentContext context) {
mainExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
- "onos/gdp-task", "%d", log));
+ "onos/gdp", "%d", log));
taskExecutor = new DeviceTaskExecutor<>(mainExecutor);
- probeExecutor = newSingleThreadScheduledExecutor(groupedThreads(
- "onos/gdp-probe", "%d", log));
providerService = providerRegistry.register(this);
componentConfigService.registerProperties(getClass());
coreService.registerApplication(APP_NAME);
@@ -207,7 +203,7 @@
gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber(
gnmiController, deviceService, mastershipService, providerService);
gnmiDeviceStateSubscriber.activate();
- startOrRescheduleProbeTask();
+ startOrReschedulePeriodicCheckupTasks();
statsPoller = new StatsPoller(deviceService, mastershipService, providerService);
statsPoller.activate(statsPollInterval);
modified(context);
@@ -221,19 +217,19 @@
}
Dictionary<?, ?> properties = context.getProperties();
- final int oldProbeFrequency = probeInterval;
- probeInterval = Tools.getIntegerProperty(
- properties, PROBE_INTERVAL, PROBE_INTERVAL_DEFAULT);
+ final int oldCheckupInterval = checkupInterval;
+ checkupInterval = Tools.getIntegerProperty(
+ properties, CHECKUP_INTERVAL, CHECKUP_INTERVAL_DEFAULT);
log.info("Configured. {} is configured to {} seconds",
- PROBE_INTERVAL, probeInterval);
+ CHECKUP_INTERVAL, checkupInterval);
final int oldStatsPollFrequency = statsPollInterval;
statsPollInterval = Tools.getIntegerProperty(
properties, STATS_POLL_INTERVAL, STATS_POLL_INTERVAL_DEFAULT);
log.info("Configured. {} is configured to {} seconds",
STATS_POLL_INTERVAL, statsPollInterval);
- if (oldProbeFrequency != probeInterval) {
- startOrRescheduleProbeTask();
+ if (oldCheckupInterval != checkupInterval) {
+ startOrReschedulePeriodicCheckupTasks();
}
if (oldStatsPollFrequency != statsPollInterval) {
@@ -246,16 +242,9 @@
// Shutdown stats poller.
statsPoller.deactivate();
statsPoller = null;
- // Shutdown probe executor.
- probeTask.cancel(true);
- probeTask = null;
- probeExecutor.shutdownNow();
- try {
- probeExecutor.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("probeExecutor not terminated properly");
- }
- probeExecutor = null;
+ // Shutdown periodic checkup task.
+ checkupTask.cancel(false);
+ checkupTask = null;
// Shutdown main and task executor.
taskExecutor.cancel();
taskExecutor = null;
@@ -270,7 +259,7 @@
handshakersWithListeners.values().forEach(h -> h.removeDeviceAgentListener(id()));
handshakersWithListeners.clear();
// Other cleanup.
- lastProbedAvailability.clear();
+ lastCheckups.clear();
componentConfigService.unregisterProperties(getClass(), false);
cfgService.removeListener(cfgListener);
pipeconfWatchdogService.removeListener(pipeconfWatchdogListener);
@@ -284,12 +273,11 @@
@Override
public void triggerProbe(DeviceId deviceId) {
checkNotNull(deviceId);
- submitTask(deviceId, TaskType.PROBE_AVAILABILITY);
+ submitTask(deviceId, TaskType.CHECKUP);
}
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
-
final MastershipInfo mastershipInfo = mastershipService.getMastershipFor(deviceId);
final NodeId localNodeId = clusterService.getLocalNode().id();
@@ -300,6 +288,14 @@
newRole = mastershipInfo.getRole(localNodeId);
}
+ final DeviceHandshaker handshaker = getBehaviour(
+ deviceId, DeviceHandshaker.class);
+ if (handshaker == null) {
+ log.error("Null handshaker. Unable to notify role {} to {}",
+ newRole, deviceId);
+ return;
+ }
+
// Derive preference value.
final int preference;
switch (newRole) {
@@ -313,14 +309,15 @@
log.error("Unable to derive mastership preference for {}, " +
"requested role {} but local node ID was " +
"not found among list of backup nodes " +
- "reported by mastership service");
+ "reported by mastership service",
+ deviceId, newRole);
return;
}
break;
case NONE:
// No preference for NONE, apply as is.
log.info("Notifying role {} to {}", newRole, deviceId);
- roleChanged(deviceId, newRole);
+ handshaker.roleChanged(newRole);
return;
default:
log.error("Unrecognized mastership role {}", newRole);
@@ -330,14 +327,6 @@
log.info("Notifying role {} (preference {}) for term {} to {}",
newRole, preference, mastershipInfo.term(), deviceId);
- final DeviceHandshaker handshaker = getBehaviour(
- deviceId, DeviceHandshaker.class);
- if (handshaker == null) {
- log.error("Null handshaker. Unable to notify role {} to {}",
- newRole, deviceId);
- return;
- }
-
try {
handshaker.roleChanged(preference, mastershipInfo.term());
} catch (UnsupportedOperationException e) {
@@ -413,12 +402,12 @@
DeviceId deviceId = (DeviceId) event.subject();
switch (event.type()) {
case CONFIG_ADDED:
- if (configIsComplete(deviceId)) {
+ if (configIsPresent(deviceId)) {
submitTask(deviceId, TaskType.CONNECTION_SETUP);
}
break;
case CONFIG_UPDATED:
- if (configIsComplete(deviceId) && mgmtAddrUpdated(event)) {
+ if (configIsPresent(deviceId) && mgmtAddrUpdated(event)) {
submitTask(deviceId, TaskType.CONNECTION_UPDATE);
}
break;
@@ -472,7 +461,12 @@
submitTask(deviceId, TaskType.ROLE_STANDBY);
break;
case ROLE_NONE:
- submitTask(deviceId, TaskType.ROLE_NONE);
+ // FIXME: in case of device disconnection, agents will
+ // signal role NONE, preventing the DeviceManager to mark
+ // the device as offline, as only the master can do that. We
+ // should change the DeviceManager. For now, we disable
+ // signaling role NONE.
+ // submitTask(deviceId, TaskType.ROLE_NONE);
break;
case NOT_MASTER:
submitTask(deviceId, TaskType.NOT_MASTER);
@@ -508,68 +502,26 @@
}
}
- private void startOrRescheduleProbeTask() {
+ private void startOrReschedulePeriodicCheckupTasks() {
synchronized (this) {
- if (probeTask != null) {
- probeTask.cancel(false);
+ if (checkupTask != null) {
+ checkupTask.cancel(false);
}
- probeTask = probeExecutor.scheduleAtFixedRate(
- this::submitProbeTasks,
- 0,
- probeInterval,
- TimeUnit.SECONDS);
+ checkupTask = SharedScheduledExecutors.getPoolThreadExecutor()
+ .scheduleAtFixedRate(
+ this::submitCheckupTasksForAllDevices,
+ 1,
+ checkupInterval,
+ TimeUnit.SECONDS);
}
}
- private void submitProbeTasks() {
+ private void submitCheckupTasksForAllDevices() {
// Async trigger a task for all devices in the cfg.
- log.debug("Starting probing for all devices");
+ log.debug("Submitting checkup task for all devices...");
cfgService.getSubjects(DeviceId.class).stream()
.filter(GeneralDeviceProvider::myScheme)
- .forEach(this::submitProbeTask);
- }
-
- private void submitProbeTask(DeviceId deviceId) {
- final DeviceHandshaker handshaker = handshakersWithListeners.get(deviceId);
-
- if (handshaker == null) {
- if (configIsComplete(deviceId)) {
- // Device in config but we have not initiated a connection.
- // Perhaps we missed the config event?
- submitTask(deviceId, TaskType.CONNECTION_SETUP);
- }
- return;
- }
-
- if (!handshaker.hasConnection()) {
- // Device is in the core, but driver reports there is NOT a
- // connection to it. Perhaps the netcfg changed and we didn't
- // pick the event?
- log.warn("Re-establishing lost connection to {}", deviceId);
- submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
- submitTask(deviceId, TaskType.CONNECTION_SETUP);
- return;
- }
-
- // On probing offline devices, while we expect them to signal
- // availability via CHANNEL_OPEN or similar events, periodic probing
- // might be needed to stimulate some channel activity. We might consider
- // requiring active probing of closed channels in the protocol layer.
-
- final Long lastProbe = lastProbedAvailability.get(deviceId);
- if (lastProbe != null &&
- (currentTimeMillis() - lastProbe) < (probeInterval * 1000 / 3)) {
- // This avoids overload of probe tasks which might involve sending
- // messages over the network. We require a minimum interval of 1/3
- // of the configured probeInterval between consecutive probe tasks.
- if (log.isDebugEnabled()) {
- log.debug("Dropping probe task for {} as it happened recently",
- deviceId);
- }
- return;
- }
-
- submitTask(deviceId, TaskType.PROBE_AVAILABILITY);
+ .forEach(d -> submitTask(d, TaskType.CHECKUP));
}
/**
@@ -583,7 +535,7 @@
CHANNEL_OPEN,
CHANNEL_CLOSED,
PIPELINE_NOT_READY,
- PROBE_AVAILABILITY,
+ CHECKUP,
ROLE_MASTER,
ROLE_NONE,
ROLE_STANDBY,
@@ -603,9 +555,9 @@
case CONNECTION_TEARDOWN:
return () -> handleConnectionTeardown(deviceId);
case CHANNEL_OPEN:
- case PROBE_AVAILABILITY:
+ case CHECKUP:
case PIPELINE_READY:
- return () -> handleProbeAvailability(deviceId);
+ return () -> doCheckupAndRepair(deviceId);
case CHANNEL_CLOSED:
case PIPELINE_NOT_READY:
return () -> markOfflineIfNeeded(deviceId);
@@ -681,15 +633,9 @@
}
private boolean probeAvailability(DeviceHandshaker handshaker) {
- lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis());
return Futures.getUnchecked(handshaker.probeAvailability());
}
- private boolean probeReachability(DeviceHandshaker handshaker) {
- lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis());
- return Futures.getUnchecked(handshaker.probeReachability());
- }
-
private void markOfflineIfNeeded(DeviceId deviceId) {
assertDeviceRegistered(deviceId);
if (deviceService.isAvailable(deviceId)) {
@@ -697,46 +643,110 @@
}
}
- private void handleProbeAvailability(DeviceId deviceId) {
- assertDeviceRegistered(deviceId);
+ private void doCheckupAndRepair(DeviceId deviceId) {
- // Make sure device has a valid mastership role.
- final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
- final MastershipRole deviceRole = handshaker.getRole();
- final MastershipRole expectedRole = mastershipService.getLocalRole(deviceId);
- if (expectedRole == MastershipRole.NONE || expectedRole != deviceRole) {
- // Device does NOT have a valid role...
- if (!handshaker.isReachable() && !probeReachability(handshaker)) {
- // ...but is not reachable. There isn't much we can do.
- markOfflineIfNeeded(deviceId);
- return;
- }
- // ...and is reachable, re-assert role.
- roleChanged(deviceId, expectedRole == MastershipRole.NONE
- ? mastershipService.requestRoleForSync(deviceId)
- : expectedRole);
- try {
- // Wait for role to be notified and reachability state to be
- // updated. This should be roughly equivalent to one RTT.
- Thread.sleep(500);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- }
+ // This task should be invoked periodically for each device known by
+ // this provider, or as a consequence of events signaling potential
+ // availability changes of the device. We check that everything is in
+ // order, repair what's wrong, and eventually mark the the device as
+ // available (or not) in the core.
+
+ if (!configIsPresent(deviceId)) {
+ // We should have a connection only for devices in the config.
+ submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
+ return;
}
- // Check and update availability.
- if (probeAvailability(handshakerOrFail(deviceId))) {
+ final DeviceHandshaker handshaker = handshakersWithListeners.get(deviceId);
+ if (handshaker == null) {
+ // Device in config but we have not initiated a connection.
+ // Perhaps we missed the config event?
+ submitTask(deviceId, TaskType.CONNECTION_SETUP);
+ return;
+ }
+
+ // If here, we have a handshaker meaning we already connected once to
+ // the device...
+ if (!handshaker.hasConnection()) {
+ // ... but now the driver reports there is NOT a connection.
+ // Perhaps the netcfg changed and we didn't pick the event?
+ log.warn("Re-establishing lost connection to {}", deviceId);
+ submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
+ submitTask(deviceId, TaskType.CONNECTION_SETUP);
+ return;
+ }
+
+ // If here, device should be registered in the core.
+ assertDeviceRegistered(deviceId);
+
+ if (!handshaker.isReachable()) {
+ // Device appears to be offline.
+ markOfflineIfNeeded(deviceId);
+ // While we expect the protocol layer to implement some sort of
+ // connection backoff mechanism and to signal availability via
+ // CHANNEL_OPEN events, we stimulate some channel activity now.
+ // Trigger probe over the network and forget about it (not waiting
+ // for future to complete). If channel is ready, we expect to come
+ // back here via a CHANNEL_OPEN event.
+ handshaker.probeReachability();
+ return;
+ }
+
+ // If here, device is reachable. Now do mastership and availability
+ // checkups. To avoid overload of checkup tasks which might involve
+ // sending messages over the network and triggering mastership
+ // elections. We require a minimum interval of 1/3 of the configured
+ // checkupInterval between consecutive checkup tasks when the device is
+ // known to be available.
+
+ final Long lastCheckup = lastCheckups.get(deviceId);
+ final boolean isAvailable = deviceService.isAvailable(deviceId);
+ if (isAvailable && lastCheckup != null &&
+ (currentTimeMillis() - lastCheckup) < (checkupInterval * 1000 / 3)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Dropping checkup task for {} as it happened recently",
+ deviceId);
+ }
+ return;
+ }
+ lastCheckups.put(deviceId, currentTimeMillis());
+
+ // Make sure device has a valid mastership role.
+ final MastershipRole expectedRole = mastershipService.getLocalRole(deviceId);
+ if (expectedRole == MastershipRole.NONE) {
+ log.debug("Detected invalid role ({}) for {}, waiting for mastership " +
+ "service to fix this...",
+ expectedRole, deviceId);
+ // Gentle nudge to fix things...
+ mastershipService.requestRoleForSync(deviceId);
+ return;
+ }
+
+ final MastershipRole deviceRole = handshaker.getRole();
+ if (expectedRole != deviceRole) {
+ // FIXME: we should be checking the mastership term as well.
+ log.debug("Detected role mismatch for {}, core expects {}, " +
+ "but device reports {}, waiting for mastership " +
+ "service to fix this...",
+ deviceId, expectedRole, deviceRole);
+ // Gentle nudge to fix things...
+ providerService.receivedRoleReply(deviceId, deviceRole);
+ return;
+ }
+
+ // Check and update availability, which differently from reachability
+ // describes the ability of the device to forward packets.
+ if (probeAvailability(handshaker)) {
// Device ready to do its job.
createOrUpdateDevice(deviceId, true);
} else {
markOfflineIfNeeded(deviceId);
- if (handshaker.isReachable() && isPipelineProgrammable(deviceId)) {
- // If reachable, but not available, and pipeline programmable, there
- // is a high chance it's because the pipeline is not READY
- // (independently from what the pipeconf watchdog reports, as the
- // status there might be outdated). Encourage pipeconf watchdog to
- // perform a pipeline probe ASAP.
+ if (isPipelineProgrammable(deviceId)) {
+ // If reachable, but not available, and pipeline programmable,
+ // there is a high chance it's because the pipeline is not READY
+ // (independently from what the pipeconf watchdog reports, as
+ // the status there might be outdated). Encourage pipeconf
+ // watchdog to perform a pipeline probe ASAP.
pipeconfWatchdogService.triggerProbe(deviceId);
}
}
@@ -750,19 +760,19 @@
private void handleNotMaster(DeviceId deviceId) {
assertDeviceRegistered(deviceId);
- if (mastershipService.isLocalMaster(deviceId)) {
- log.warn("Device {} notified that this node is not master, " +
- "relinquishing mastership...", deviceId);
- mastershipService.relinquishMastership(deviceId);
- }
+ handleMastershipResponse(deviceId, handshakerOrFail(deviceId).getRole());
}
private void assertDeviceRegistered(DeviceId deviceId) {
- if (deviceService.getDevice(deviceId) == null) {
+ if (!deviceIsRegistered(deviceId)) {
throw new DeviceTaskException("device not registered in the core");
}
}
+ private boolean deviceIsRegistered(DeviceId deviceId) {
+ return deviceService.getDevice(deviceId) != null;
+ }
+
private void handleConnectionTeardown(DeviceId deviceId) {
if (deviceService.getDevice(deviceId) != null
&& deviceService.isAvailable(deviceId)) {
@@ -772,7 +782,7 @@
handshaker.removeDeviceAgentListener(id());
handshakersWithListeners.remove(deviceId);
handshaker.disconnect();
- lastProbedAvailability.remove(deviceId);
+ lastCheckups.remove(deviceId);
}
private void bindPipeconfIfRequired(DeviceId deviceId) {
@@ -789,6 +799,10 @@
if (pipeconfId == null) {
throw new DeviceTaskException("unable to find pipeconf");
}
+ if (!pipeconfService.getPipeconf(pipeconfId).isPresent()) {
+ throw new DeviceTaskException(format(
+ "pipeconf %s not registered", pipeconfId));
+ }
// Store binding in pipeconf service.
pipeconfService.bindToDevice(pipeconfId, deviceId);
}
@@ -829,14 +843,14 @@
return handshaker;
}
- private boolean configIsComplete(DeviceId deviceId) {
+ private boolean configIsPresent(DeviceId deviceId) {
final BasicDeviceConfig basicDeviceCfg = cfgService.getConfig(
deviceId, BasicDeviceConfig.class);
return basicDeviceCfg != null && !isNullOrEmpty(basicDeviceCfg.driver());
}
private void assertConfig(DeviceId deviceId) {
- if (!configIsComplete(deviceId)) {
+ if (!configIsPresent(deviceId)) {
throw new DeviceTaskException("configuration is not complete");
}
}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
index cf867ee..f2f2d08 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
@@ -26,7 +26,7 @@
public static final String STATS_POLL_INTERVAL = "statsPollInterval";
public static final int STATS_POLL_INTERVAL_DEFAULT = 10;
- public static final String PROBE_INTERVAL = "probeInterval";
- public static final int PROBE_INTERVAL_DEFAULT = 10;
+ public static final String CHECKUP_INTERVAL = "checkupInterval";
+ public static final int CHECKUP_INTERVAL_DEFAULT = 10;
}