More improvements and bugfixes in P4Runtime subsystem
Most notably, we fix a bug in which some nodes were not able to find
pipeconf-specific behaviors for a given device. The problem is not
completelly solved but it's mitigated.
There's a race condition caused by the fact that the GDP updates the cfg
with the merged driver name before advertising the device to the core.
Some nodes might receive the cfg update after the device has been
advertised. We mitigate the problem by performing the pipeline deploy
(slow operation) after the cfg update, giving more time for nodes
to catch up. Perhaps we should listen for cfg update events before
advertising the device to the core?
Also:
- NPE when getting P4Runtime client
- Detect if a base driver is already merged in pipeconf manager
- Longer timeouts in P4Runtime driver and protocol (for slow networks)
- Configurable timeout in P4Runtime driver and GDP
- NPE when adding/removing device agent listeners in P4Rtunime handshaker
- Various exceptions due to race conditions in GDP when disconnecting
devices (by serializing disconnect tasks per device)
- NPE when cancelling polling tasks in GDP
- Refactored PipeconfService to distinguish between driver merge,
pipeconf map update, and cfg update (now performed in the GDP)
- Fixed PipeconfManagerTest, not testing driver behaviours
- Use Guava striped locks when possible (more memory-efficient than maps,
and with strict atomicity guarantees w.r.t. to caches).
Change-Id: I30f3887541ba0fd44439a86885e9821ac565b64c
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index fc76a23..836ed7a 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -36,10 +36,10 @@
* channel to the device is open, false otherwise.
* <p>
* Only one client can exist for the same device ID. Calls to this method
- * are idempotent for the same [device ID, address, port, p4DeviceId] tuple,
- * i.e. returns true if such client already exists but a new one is not
- * created. Throws an {@link IllegalStateException} if a client for device
- * ID already exists but for different [address, port, p4DeviceId].
+ * are idempotent for the same [device ID, address, port, p4DeviceId]
+ * triplet, i.e. returns true if such client already exists but a new one is
+ * not created. Throws an {@link IllegalStateException} if a client for
+ * device ID already exists but for different [address, port, p4DeviceId].
*
* @param deviceId device identifier
* @param serverAddr address of the P4Runtime server
@@ -49,18 +49,17 @@
* open
* @throws IllegalStateException if a client already exists for this device
* ID but for different [address, port,
- * p4DeviceId].
+ * p4DeviceId] triplet.
*/
boolean createClient(DeviceId deviceId, String serverAddr, int serverPort,
long p4DeviceId);
/**
- * Returns a client to operate on the given device.
+ * Returns a client to operate on the given device, or null if a client for
+ * such device does not exist in this controller.
*
* @param deviceId device identifier
- * @return client instance
- * @throws IllegalStateException if no client exists for the given device
- * identifier
+ * @return client instance or null
*/
P4RuntimeClient getClient(DeviceId deviceId);
@@ -92,7 +91,7 @@
* @return true if a client was created and is able to contact the P4Runtime
* server, false otherwise.
*/
- boolean isReacheable(DeviceId deviceId);
+ boolean isReachable(DeviceId deviceId);
/**
* Adds a listener for device agent events.
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index bd688f7..a20a0e6 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -105,8 +105,8 @@
*/
final class P4RuntimeClientImpl implements P4RuntimeClient {
- // Timeout in seconds to obtain the client lock.
- private static final int LOCK_TIMEOUT = 10;
+ // Timeout in seconds to obtain the request lock.
+ private static final int LOCK_TIMEOUT = 60;
private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index d2773b2..b35b999 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -16,10 +16,8 @@
package org.onosproject.p4runtime.ctl;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolverProvider;
@@ -50,10 +48,10 @@
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
@@ -66,21 +64,16 @@
extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
implements P4RuntimeController {
- private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
private final Logger log = getLogger(getClass());
private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
- private final Map<DeviceId, ClientKey> deviceIdToClientKey = Maps.newHashMap();
- private final Map<ClientKey, P4RuntimeClient> clientKeyToClient = Maps.newHashMap();
+
+ private final Map<DeviceId, ClientKey> clientKeys = Maps.newHashMap();
+ private final Map<ClientKey, P4RuntimeClient> clients = Maps.newHashMap();
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+
private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
- private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
- .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
- .build(new CacheLoader<DeviceId, ReadWriteLock>() {
- @Override
- public ReadWriteLock load(DeviceId deviceId) {
- return new ReentrantReadWriteLock();
- }
- });
+ private final Striped<Lock> stripedLocks = Striped.lock(30);
+
private DistributedElectionIdGenerator electionIdGenerator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -99,6 +92,11 @@
@Deactivate
public void deactivate() {
+ clientKeys.keySet().forEach(this::removeClient);
+ clientKeys.clear();
+ clients.clear();
+ channelIds.clear();
+ deviceAgentListeners.clear();
grpcController = null;
electionIdGenerator.destroy();
electionIdGenerator = null;
@@ -111,44 +109,40 @@
int serverPort, long p4DeviceId) {
checkNotNull(deviceId);
checkNotNull(serverAddr);
+ checkArgument(serverPort > 0, "Invalid server port");
- ClientKey newKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
+ return withDeviceLock(() -> doCreateClient(
+ deviceId, serverAddr, serverPort, p4DeviceId), deviceId);
+ }
+
+ private boolean doCreateClient(DeviceId deviceId, String serverAddr,
+ int serverPort, long p4DeviceId) {
+
+ ClientKey clientKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
+
+ if (clientKeys.containsKey(deviceId)) {
+ final ClientKey existingKey = clientKeys.get(deviceId);
+ if (clientKey.equals(existingKey)) {
+ log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+ deviceId, serverAddr, serverPort, p4DeviceId);
+ return true;
+ } else {
+ throw new IllegalStateException(
+ "A client for the same device ID but different " +
+ "server endpoints already exists");
+ }
+ }
+
+ log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
+ deviceId, serverAddr, serverPort, p4DeviceId);
+
+ GrpcChannelId channelId = GrpcChannelId.of(
+ clientKey.deviceId(), "p4runtime-" + clientKey);
ManagedChannelBuilder channelBuilder = NettyChannelBuilder
.forAddress(serverAddr, serverPort)
- .usePlaintext(true);
-
- deviceLocks.getUnchecked(deviceId).writeLock().lock();
-
- try {
- if (deviceIdToClientKey.containsKey(deviceId)) {
- final ClientKey existingKey = deviceIdToClientKey.get(deviceId);
- if (newKey.equals(existingKey)) {
- log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
- deviceId, serverAddr, serverPort, p4DeviceId);
- return true;
- } else {
- throw new IllegalStateException(
- "A client for the same device ID but different " +
- "server endpoints already exists");
- }
- } else {
- log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
- deviceId, serverAddr, serverPort, p4DeviceId);
- return doCreateClient(newKey, channelBuilder);
- }
- } finally {
- deviceLocks.getUnchecked(deviceId).writeLock().unlock();
- }
- }
-
- private boolean doCreateClient(ClientKey clientKey, ManagedChannelBuilder channelBuilder) {
-
- GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(),
- "p4runtime-" + clientKey.p4DeviceId());
-
- // Channel defaults.
- channelBuilder.nameResolverFactory(nameResolverProvider);
+ .usePlaintext(true)
+ .nameResolverFactory(nameResolverProvider);
ManagedChannel channel;
try {
@@ -162,72 +156,69 @@
P4RuntimeClient client = new P4RuntimeClientImpl(
clientKey.deviceId(), clientKey.p4DeviceId(), channel, this);
+ clientKeys.put(clientKey.deviceId(), clientKey);
+ clients.put(clientKey, client);
channelIds.put(clientKey.deviceId(), channelId);
- deviceIdToClientKey.put(clientKey.deviceId(), clientKey);
- clientKeyToClient.put(clientKey, client);
return true;
}
@Override
public P4RuntimeClient getClient(DeviceId deviceId) {
+ if (deviceId == null) {
+ return null;
+ }
+ return withDeviceLock(() -> doGetClient(deviceId), deviceId);
+ }
- deviceLocks.getUnchecked(deviceId).readLock().lock();
-
- try {
- if (!deviceIdToClientKey.containsKey(deviceId)) {
- return null;
- } else {
- return clientKeyToClient.get(deviceIdToClientKey.get(deviceId));
- }
- } finally {
- deviceLocks.getUnchecked(deviceId).readLock().unlock();
+ private P4RuntimeClient doGetClient(DeviceId deviceId) {
+ if (!clientKeys.containsKey(deviceId)) {
+ return null;
+ } else {
+ return clients.get(clientKeys.get(deviceId));
}
}
@Override
public void removeClient(DeviceId deviceId) {
-
- deviceLocks.getUnchecked(deviceId).writeLock().lock();
- try {
- if (deviceIdToClientKey.containsKey(deviceId)) {
- final ClientKey clientKey = deviceIdToClientKey.get(deviceId);
- clientKeyToClient.remove(clientKey).shutdown();
- grpcController.disconnectChannel(channelIds.get(deviceId));
- deviceIdToClientKey.remove(deviceId);
- channelIds.remove(deviceId);
- }
- } finally {
- deviceLocks.getUnchecked(deviceId).writeLock().unlock();
+ if (deviceId == null) {
+ return;
}
+ withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
+ }
+
+ private Void doRemoveClient(DeviceId deviceId) {
+ if (clientKeys.containsKey(deviceId)) {
+ final ClientKey clientKey = clientKeys.get(deviceId);
+ clients.get(clientKey).shutdown();
+ grpcController.disconnectChannel(channelIds.get(deviceId));
+ clientKeys.remove(deviceId);
+ clients.remove(clientKey);
+ channelIds.remove(deviceId);
+ }
+ return null;
}
@Override
public boolean hasClient(DeviceId deviceId) {
- deviceLocks.getUnchecked(deviceId).readLock().lock();
-
- try {
- return deviceIdToClientKey.containsKey(deviceId);
- } finally {
- deviceLocks.getUnchecked(deviceId).readLock().unlock();
- }
+ return clientKeys.containsKey(deviceId);
}
@Override
- public boolean isReacheable(DeviceId deviceId) {
-
- deviceLocks.getUnchecked(deviceId).readLock().lock();
-
- try {
- if (!deviceIdToClientKey.containsKey(deviceId)) {
- log.debug("No client for {}, can't check for reachability", deviceId);
- return false;
- }
- // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
- return grpcController.isChannelOpen(channelIds.get(deviceId));
- } finally {
- deviceLocks.getUnchecked(deviceId).readLock().unlock();
+ public boolean isReachable(DeviceId deviceId) {
+ if (deviceId == null) {
+ return false;
}
+ return withDeviceLock(() -> doIsReacheable(deviceId), deviceId);
+ }
+
+ private boolean doIsReacheable(DeviceId deviceId) {
+ // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
+ if (!clientKeys.containsKey(deviceId)) {
+ log.debug("No client for {}, can't check for reachability", deviceId);
+ return false;
+ }
+ return grpcController.isChannelOpen(channelIds.get(deviceId));
}
@Override
@@ -244,6 +235,16 @@
});
}
+ private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
+ final Lock lock = stripedLocks.get(deviceId);
+ lock.lock();
+ try {
+ return task.get();
+ } finally {
+ lock.unlock();
+ }
+ }
+
BigInteger newMasterElectionId(DeviceId deviceId) {
return electionIdGenerator.generate(deviceId);
}
@@ -274,7 +275,7 @@
agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
break;
case ERROR:
- agentEventType = !isReacheable(deviceId)
+ agentEventType = !isReachable(deviceId)
? DeviceAgentEvent.Type.CHANNEL_CLOSED
: DeviceAgentEvent.Type.CHANNEL_ERROR;
break;