Improve scalability of P4Runtime subsystem
The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.
Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)
Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups
Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 987356b..d2773b2 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -35,20 +35,21 @@
import org.onosproject.grpc.api.GrpcChannelId;
import org.onosproject.grpc.api.GrpcController;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelEvent;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.api.P4RuntimeEventListener;
-import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
+import java.math.BigInteger;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -65,14 +66,13 @@
extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
implements P4RuntimeController {
- private static final String P4R_ELECTION = "p4runtime-election";
private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
private final Logger log = getLogger(getClass());
private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
private final Map<DeviceId, ClientKey> deviceIdToClientKey = Maps.newHashMap();
private final Map<ClientKey, P4RuntimeClient> clientKeyToClient = Maps.newHashMap();
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
- private final Map<DeviceId, List<ChannelListener>> channelListeners = Maps.newConcurrentMap();
+ private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
.expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
.build(new CacheLoader<DeviceId, ReadWriteLock>() {
@@ -81,8 +81,7 @@
return new ReentrantReadWriteLock();
}
});
-
- private AtomicCounter electionIdGenerator;
+ private DistributedElectionIdGenerator electionIdGenerator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private GrpcController grpcController;
@@ -93,8 +92,7 @@
@Activate
public void activate() {
eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
- electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION);
-
+ electionIdGenerator = new DistributedElectionIdGenerator(storageService);
log.info("Started");
}
@@ -102,6 +100,8 @@
@Deactivate
public void deactivate() {
grpcController = null;
+ electionIdGenerator.destroy();
+ electionIdGenerator = null;
eventDispatcher.removeSink(P4RuntimeEvent.class);
log.info("Stopped");
}
@@ -119,13 +119,13 @@
.usePlaintext(true);
deviceLocks.getUnchecked(deviceId).writeLock().lock();
- log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
- deviceId, serverAddr, serverPort, p4DeviceId);
try {
if (deviceIdToClientKey.containsKey(deviceId)) {
final ClientKey existingKey = deviceIdToClientKey.get(deviceId);
if (newKey.equals(existingKey)) {
+ log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+ deviceId, serverAddr, serverPort, p4DeviceId);
return true;
} else {
throw new IllegalStateException(
@@ -133,6 +133,8 @@
"server endpoints already exists");
}
} else {
+ log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
+ deviceId, serverAddr, serverPort, p4DeviceId);
return doCreateClient(newKey, channelBuilder);
}
} finally {
@@ -187,12 +189,11 @@
public void removeClient(DeviceId deviceId) {
deviceLocks.getUnchecked(deviceId).writeLock().lock();
-
try {
if (deviceIdToClientKey.containsKey(deviceId)) {
final ClientKey clientKey = deviceIdToClientKey.get(deviceId);
- grpcController.disconnectChannel(channelIds.get(deviceId));
clientKeyToClient.remove(clientKey).shutdown();
+ grpcController.disconnectChannel(channelIds.get(deviceId));
deviceIdToClientKey.remove(deviceId);
channelIds.remove(deviceId);
}
@@ -222,7 +223,7 @@
log.debug("No client for {}, can't check for reachability", deviceId);
return false;
}
-
+ // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
return grpcController.isChannelOpen(channelIds.get(deviceId));
} finally {
deviceLocks.getUnchecked(deviceId).readLock().unlock();
@@ -230,67 +231,73 @@
}
@Override
- public long getNewMasterElectionId() {
- return electionIdGenerator.incrementAndGet();
+ public void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+ deviceAgentListeners.putIfAbsent(deviceId, new CopyOnWriteArrayList<>());
+ deviceAgentListeners.get(deviceId).add(listener);
}
@Override
- public void addChannelListener(DeviceId deviceId, ChannelListener listener) {
- channelListeners.compute(deviceId, (devId, listeners) -> {
- List<ChannelListener> newListeners;
- if (listeners != null) {
- newListeners = listeners;
- } else {
- newListeners = new ArrayList<>();
- }
- newListeners.add(listener);
- return newListeners;
+ public void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+ deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
+ listeners.remove(listener);
+ return listeners;
});
}
- @Override
- public void removeChannelListener(DeviceId deviceId, ChannelListener listener) {
- channelListeners.compute(deviceId, (devId, listeners) -> {
- if (listeners != null) {
- listeners.remove(listener);
- return listeners;
- } else {
- log.debug("Device {} has no listener registered", deviceId);
- return null;
- }
- });
+ BigInteger newMasterElectionId(DeviceId deviceId) {
+ return electionIdGenerator.generate(deviceId);
}
void postEvent(P4RuntimeEvent event) {
- if (event.type().equals(P4RuntimeEvent.Type.CHANNEL_EVENT)) {
- DefaultChannelEvent channelError = (DefaultChannelEvent) event.subject();
- DeviceId deviceId = event.subject().deviceId();
- ChannelEvent channelEvent = null;
- //If disconnection is already known we propagate it.
- if (channelError.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
- channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
- channelError.throwable());
- } else if (channelError.type().equals(ChannelEvent.Type.CHANNEL_ERROR)) {
- //If we don't know what the error is we check for reachability
- if (!isReacheable(deviceId)) {
- //if false the channel has disconnected
- channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
- channelError.throwable());
- } else {
- // else we propagate the event.
- channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_ERROR, channelError.deviceId(),
- channelError.throwable());
- }
- }
- //Ignoring CHANNEL_CONNECTED
- if (channelEvent != null && channelListeners.get(deviceId) != null) {
- for (ChannelListener listener : channelListeners.get(deviceId)) {
- listener.event(channelEvent);
- }
- }
- } else {
- post(event);
+ switch (event.type()) {
+ case CHANNEL_EVENT:
+ handleChannelEvent(event);
+ break;
+ case ARBITRATION_RESPONSE:
+ handleArbitrationReply(event);
+ break;
+ default:
+ post(event);
+ break;
}
}
+ private void handleChannelEvent(P4RuntimeEvent event) {
+ final ChannelEvent channelEvent = (ChannelEvent) event.subject();
+ final DeviceId deviceId = channelEvent.deviceId();
+ final DeviceAgentEvent.Type agentEventType;
+ switch (channelEvent.type()) {
+ case OPEN:
+ agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+ break;
+ case CLOSED:
+ agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+ break;
+ case ERROR:
+ agentEventType = !isReacheable(deviceId)
+ ? DeviceAgentEvent.Type.CHANNEL_CLOSED
+ : DeviceAgentEvent.Type.CHANNEL_ERROR;
+ break;
+ default:
+ log.warn("Unrecognized channel event type {}", channelEvent.type());
+ return;
+ }
+ postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
+ }
+
+ private void handleArbitrationReply(P4RuntimeEvent event) {
+ final DeviceId deviceId = event.subject().deviceId();
+ final ArbitrationResponse response = (ArbitrationResponse) event.subject();
+ final DeviceAgentEvent.Type roleType = response.isMaster()
+ ? DeviceAgentEvent.Type.ROLE_MASTER
+ : DeviceAgentEvent.Type.ROLE_STANDBY;
+ postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
+ roleType, response.deviceId()));
+ }
+
+ private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
+ if (deviceAgentListeners.containsKey(deviceId)) {
+ deviceAgentListeners.get(deviceId).forEach(l -> l.event(event));
+ }
+ }
}