Improve scalability of P4Runtime subsystem
The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.
Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)
Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups
Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 550ca59..2b31b91 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -50,9 +50,9 @@
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.BasicDeviceConfig;
import org.onosproject.net.config.basics.SubjectFactories;
-import org.onosproject.net.device.ChannelEvent;
-import org.onosproject.net.device.ChannelListener;
import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
import org.onosproject.net.device.DeviceEvent;
@@ -95,6 +95,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -102,6 +103,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.device.DeviceEvent.Type;
@@ -116,45 +118,48 @@
@Component(immediate = true)
public class GeneralDeviceProvider extends AbstractProvider
implements DeviceProvider {
- public static final String DRIVER = "driver";
- public static final int REACHABILITY_TIMEOUT = 10;
- public static final String DEPLOY = "deploy-";
- public static final String PIPECONF_TOPIC = "-pipeconf";
- public static final String CHECK = "check-";
- public static final String CONNECTION = "-connection";
+
+ // Timeout in seconds for operations on devices.
+ private static final int DEVICE_OP_TIMEOUT = 10;
+
+ private static final String DRIVER = "driver";
+ private static final String DEPLOY = "deploy-";
+ private static final String PIPECONF_TOPIC = "-pipeconf";
+ private static final String CHECK = "check-";
+ private static final String CONNECTION = "-connection";
private static final String POLL_FREQUENCY = "pollFrequency";
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceProviderRegistry providerRegistry;
+ private DeviceProviderRegistry providerRegistry;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ComponentConfigService componentConfigService;
+ private ComponentConfigService componentConfigService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected NetworkConfigRegistry cfgService;
+ private NetworkConfigRegistry cfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CoreService coreService;
+ private CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceService deviceService;
+ private DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DriverService driverService;
+ private DriverService driverService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipService mastershipService;
+ private MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected PiPipeconfService piPipeconfService;
+ private PiPipeconfService piPipeconfService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
+ private ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LeadershipService leadershipService;
+ private LeadershipService leadershipService;
private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
@Property(name = POLL_FREQUENCY, intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
@@ -168,9 +173,9 @@
"default is 10 sec")
private int deviceAvailabilityPollFrequency = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS;
- protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
- protected static final String URI_SCHEME = "device";
- protected static final String CFG_SCHEME = "generalprovider";
+ private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
+ private static final String URI_SCHEME = "device";
+ private static final String CFG_SCHEME = "generalprovider";
private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
private static final int CORE_POOL_SIZE = 10;
private static final String UNKNOWN = "unknown";
@@ -187,25 +192,24 @@
private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
+ private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
- protected ScheduledExecutorService connectionExecutor
- = newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/generaldeviceprovider-device",
- "connection-executor-%d", log));
- protected ScheduledExecutorService portStatsExecutor
- = newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/generaldeviceprovider-port-stats",
- "port-stats-executor-%d", log));
- protected ScheduledExecutorService availabilityCheckExecutor
- = newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/generaldeviceprovider-availability-check",
- "availability-check-executor-%d", log));
- protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
- protected DeviceProviderService providerService;
+ private ExecutorService connectionExecutor
+ = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
+ "onos/generaldeviceprovider-device-connect", "%d", log));
+ private ScheduledExecutorService portStatsExecutor
+ = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+ "onos/generaldeviceprovider-port-stats", "%d", log));
+ private ScheduledExecutorService availabilityCheckExecutor
+ = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+ "onos/generaldeviceprovider-availability-check", "%d", log));
+ private ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
+
+ private DeviceProviderService providerService;
private InternalDeviceListener deviceListener = new InternalDeviceListener();
- protected final ConfigFactory factory =
+ private final ConfigFactory factory =
new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
SubjectFactories.DEVICE_SUBJECT_FACTORY,
GeneralProviderDeviceConfig.class, CFG_SCHEME) {
@@ -215,8 +219,8 @@
}
};
- protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
- private ChannelListener channelListener = new InternalChannelListener();
+ private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
+ private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
@Activate
@@ -233,7 +237,8 @@
cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
.forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
//Initiating a periodic check to see if any device is available again and reconnect it.
- availabilityCheckExecutor.scheduleAtFixedRate(this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
+ availabilityCheckExecutor.scheduleAtFixedRate(
+ this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
deviceAvailabilityPollFrequency, TimeUnit.SECONDS);
modified(context);
log.info("Started");
@@ -255,12 +260,12 @@
Set<DeviceId> deviceSubjects =
cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
deviceSubjects.forEach(deviceId -> {
- if (!compareScheme(deviceId)) {
+ if (notMyScheme(deviceId)) {
// not under my scheme, skipping
log.debug("{} is not my scheme, skipping", deviceId);
return;
}
- scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, true));
+ scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, true));
});
}
}
@@ -299,17 +304,18 @@
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
log.info("Received role {} for device {}", newRole, deviceId);
- CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
- roleReply.thenAcceptAsync(mastership -> {
- providerService.receivedRoleReply(deviceId, newRole, mastership);
- if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) {
- scheduledTasks.get(deviceId).cancel(false);
- scheduledTasks.remove(deviceId);
- } else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
- scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
- updatePortStatistics(deviceId);
- }
- });
+ requestedRoles.put(deviceId, newRole);
+ connectionExecutor.submit(() -> doRoleChanged(deviceId, newRole));
+ }
+
+ private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
+ DeviceHandshaker handshaker = getHandshaker(deviceId);
+ if (handshaker == null) {
+ log.warn("Null handshaker. Unable to notify new role {} to {}",
+ newRole, deviceId);
+ return;
+ }
+ handshaker.roleChanged(newRole);
}
@Override
@@ -321,9 +327,9 @@
return false;
}
- CompletableFuture<Boolean> reachable = handshaker.isReachable();
try {
- return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
+ return handshaker.isReachable()
+ .get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn("Device {} is not reachable {}", deviceId, e.getMessage());
log.debug("Exception", e);
@@ -358,11 +364,8 @@
@Override
public void triggerDisconnect(DeviceId deviceId) {
log.debug("Triggering disconnection of device {}", deviceId);
- connectionExecutor.execute(() -> {
- disconnectDevice(deviceId).whenComplete((success, ex) -> {
- checkAndConnect(deviceId);
- });
- });
+ connectionExecutor.execute(() -> disconnectDevice(deviceId)
+ .thenRunAsync(() -> checkAndConnect(deviceId)));
}
private DeviceHandshaker getHandshaker(DeviceId deviceId) {
@@ -438,7 +441,7 @@
connected.thenAcceptAsync(result -> {
if (result) {
- handshaker.addChannelListener(channelListener);
+ handshaker.addDeviceAgentListener(deviceAgentListener);
//Populated with the default values obtained by the driver
ChassisId cid = new ChassisId();
SparseAnnotations annotations = DefaultAnnotations.builder()
@@ -496,7 +499,7 @@
//Connecting to the device
handshaker.connect().thenAcceptAsync(result -> {
if (result) {
- handshaker.addChannelListener(channelListener);
+ handshaker.addDeviceAgentListener(deviceAgentListener);
handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
}
});
@@ -583,32 +586,29 @@
private CompletableFuture<Boolean> disconnectDevice(DeviceId deviceId) {
log.info("Disconnecting for device {}", deviceId);
- CompletableFuture<Boolean> disconnectError = new CompletableFuture<>();
+ if (scheduledTasks.containsKey(deviceId)) {
+ scheduledTasks.remove(deviceId).cancel(true);
+ }
DeviceHandshaker handshaker = handshakers.remove(deviceId);
- if (handshaker != null) {
- handshaker.disconnect().thenAcceptAsync(result -> {
- if (result) {
- log.info("Disconnected device {}", deviceId);
- providerService.deviceDisconnected(deviceId);
- disconnectError.complete(true);
- } else {
- disconnectError.complete(false);
- log.warn("Device {} was unable to disconnect", deviceId);
- }
- });
- } else {
- //gracefully ignoring.
+
+ if (handshaker == null) {
+ // gracefully ignoring.
log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
- "shutdown of communication", deviceId);
- disconnectError.complete(false);
+ "shutdown of communication", deviceId);
+ return CompletableFuture.completedFuture(false);
}
- ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
- if (pollingStatisticsTask != null) {
- pollingStatisticsTask.cancel(true);
- scheduledTasks.remove(deviceId);
- }
- return disconnectError;
+
+ return handshaker.disconnect()
+ .thenApplyAsync(result -> {
+ if (result) {
+ log.info("Disconnected device {}", deviceId);
+ providerService.deviceDisconnected(deviceId);
+ } else {
+ log.warn("Device {} was unable to disconnect", deviceId);
+ }
+ return result;
+ });
}
//Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -637,8 +637,8 @@
}
}
- private boolean compareScheme(DeviceId deviceId) {
- return deviceId.uri().getScheme().equals(URI_SCHEME);
+ private boolean notMyScheme(DeviceId deviceId) {
+ return !deviceId.uri().getScheme().equals(URI_SCHEME);
}
/**
@@ -650,7 +650,7 @@
public void event(NetworkConfigEvent event) {
DeviceId deviceId = (DeviceId) event.subject();
//Assuming that the deviceId comes with uri 'device:'
- if (!compareScheme(deviceId)) {
+ if (notMyScheme(deviceId)) {
// not under my scheme, skipping
log.debug("{} is not my scheme, skipping", deviceId);
return;
@@ -765,7 +765,7 @@
pipelineConfigured.remove(deviceId);
}
- private ScheduledFuture<?> scheduleStatistcsPolling(DeviceId deviceId, boolean randomize) {
+ private ScheduledFuture<?> scheduleStatsPolling(DeviceId deviceId, boolean randomize) {
int delay = 0;
if (randomize) {
delay = new SecureRandom().nextInt(10);
@@ -826,6 +826,34 @@
return present;
}
+ private void handleChannelClosed(DeviceId deviceId) {
+ disconnectDevice(deviceId).thenRunAsync(() -> {
+ // If master, notifies disconnection to the core.
+ if (mastershipService.isLocalMaster(deviceId)) {
+ log.info("Disconnecting device {}, due to channel closed event",
+ deviceId);
+ providerService.deviceDisconnected(deviceId);
+ }
+ });
+ }
+
+ private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
+ //Notify core about response.
+ if (!requestedRoles.containsKey(deviceId)) {
+ return;
+ }
+ providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
+ // If not master, cancel polling tasks, otherwise start them.
+ if (!response.equals(MastershipRole.MASTER)
+ && scheduledTasks.get(deviceId) != null) {
+ scheduledTasks.remove(deviceId).cancel(false);
+ } else if (response.equals(MastershipRole.MASTER)
+ && scheduledTasks.get(deviceId) == null) {
+ scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
+ updatePortStatistics(deviceId);
+ }
+ }
+
/**
* Listener for core device events.
*/
@@ -834,11 +862,10 @@
public void event(DeviceEvent event) {
DeviceId deviceId = event.subject().id();
// FIXME handling for mastership change scenario missing?
-
// For now this is scheduled periodically, when streaming API will
// be available we check and base it on the streaming API (e.g. gNMI)
if (mastershipService.isLocalMaster(deviceId)) {
- scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
+ scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
}
}
@@ -850,26 +877,37 @@
}
/**
- * Listener for device channel events.
+ * Listener for device agent events.
*/
- private class InternalChannelListener implements ChannelListener {
+ private class InternalDeviceAgentListener implements DeviceAgentListener {
@Override
- public void event(ChannelEvent event) {
+ public void event(DeviceAgentEvent event) {
DeviceId deviceId = event.subject();
- if (event.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
- //let's properly handle device disconnection
- CompletableFuture<Boolean> disconnection = disconnectDevice(deviceId);
- disconnection.thenAcceptAsync(result -> {
- //If master notifying of disconnection to the core.
- if (mastershipService.isLocalMaster(deviceId)) {
- log.info("Disconnecting unreachable device {}, due to error on channel", deviceId);
- providerService.deviceDisconnected(deviceId);
- }
- });
-
+ switch (event.type()) {
+ case CHANNEL_OPEN:
+ // Ignore.
+ break;
+ case CHANNEL_CLOSED:
+ handleChannelClosed(deviceId);
+ break;
+ case CHANNEL_ERROR:
+ // TODO evaluate other reaction to channel error.
+ log.warn("Received CHANNEL_ERROR from {}. Is the channel still open?",
+ deviceId);
+ break;
+ case ROLE_MASTER:
+ handleMastershipResponse(deviceId, MastershipRole.MASTER);
+ break;
+ case ROLE_STANDBY:
+ handleMastershipResponse(deviceId, MastershipRole.STANDBY);
+ break;
+ case ROLE_NONE:
+ handleMastershipResponse(deviceId, MastershipRole.NONE);
+ break;
+ default:
+ log.warn("Unrecognized device agent event {}", event.type());
}
- //TODO evaluate other type of reactions.
}
}