[ONOS-7334] Using Cache Loader to remove stale entries in p4runtime
Change-Id: Ieead6e199faf23f5fa316516adab659d3e192950
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index ca9392f..2126dfa 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -16,9 +16,11 @@
package org.onosproject.drivers.p4runtime;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import io.grpc.StatusRuntimeException;
import org.onlab.util.SharedExecutors;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
@@ -43,12 +45,12 @@
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -90,11 +92,18 @@
// FIXME: set to true as soon as the feature is implemented in P4Runtime.
private boolean readAllDirectCounters = false;
- // Needed to synchronize operations over the same table entry.
- // FIXME: locks should be removed when unused (hint use cache with timeout)
- private static final ConcurrentMap<PiTableEntryHandle, Lock>
- ENTRY_LOCKS = Maps.newConcurrentMap();
+ private static final int TABLE_ENTRY_LOCK_EXPIRE_TIME_IN_MIN = 10;
+ // Needed to synchronize operations over the same table entry.
+ private static final LoadingCache<PiTableEntryHandle, Lock>
+ ENTRY_LOCKS = CacheBuilder.newBuilder()
+ .expireAfterAccess(TABLE_ENTRY_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
+ .build(new CacheLoader<PiTableEntryHandle, Lock>() {
+ @Override
+ public Lock load(PiTableEntryHandle handle) {
+ return new ReentrantLock();
+ }
+ });
private PiPipelineModel pipelineModel;
private PiPipelineInterpreter interpreter;
private P4RuntimeTableMirror tableMirror;
@@ -275,15 +284,14 @@
.of(deviceId, piEntryToApply);
// Serialize operations over the same match key/table/device ID.
- final Lock lock = ENTRY_LOCKS.computeIfAbsent(handle, k -> new ReentrantLock());
- lock.lock();
+ ENTRY_LOCKS.getUnchecked(handle).lock();
try {
if (applyEntry(handle, piEntryToApply,
ruleToApply, driverOperation)) {
result.add(ruleToApply);
}
} finally {
- lock.unlock();
+ ENTRY_LOCKS.getUnchecked(handle).unlock();
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 57703a1..36faa0a 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -16,6 +16,9 @@
package org.onosproject.p4runtime.ctl;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
import com.google.common.collect.Maps;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -41,6 +44,7 @@
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -58,12 +62,20 @@
implements P4RuntimeController {
private static final String P4R_ELECTION = "p4runtime-election";
+ private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
private final Logger log = getLogger(getClass());
private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
- // TODO: should use a cache to delete unused locks.
- private final Map<DeviceId, ReadWriteLock> deviceLocks = Maps.newConcurrentMap();
+ private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
+ .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
+ .build(new CacheLoader<DeviceId, ReadWriteLock>() {
+ @Override
+ public ReadWriteLock load(DeviceId deviceId) {
+ return new ReentrantReadWriteLock();
+ }
+ });
+
private AtomicCounter electionIdGenerator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -94,9 +106,7 @@
checkNotNull(deviceId);
checkNotNull(channelBuilder);
- deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
- deviceLocks.get(deviceId).writeLock().lock();
-
+ deviceLocks.getUnchecked(deviceId).writeLock().lock();
log.info("Creating client for {} (with internal device id {})...", deviceId, p4DeviceId);
try {
@@ -106,7 +116,7 @@
return doCreateClient(deviceId, p4DeviceId, channelBuilder);
}
} finally {
- deviceLocks.get(deviceId).writeLock().unlock();
+ deviceLocks.getUnchecked(deviceId).writeLock().unlock();
}
}
@@ -136,21 +146,19 @@
@Override
public P4RuntimeClient getClient(DeviceId deviceId) {
- deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
- deviceLocks.get(deviceId).readLock().lock();
+ deviceLocks.getUnchecked(deviceId).readLock().lock();
try {
return clients.get(deviceId);
} finally {
- deviceLocks.get(deviceId).readLock().unlock();
+ deviceLocks.getUnchecked(deviceId).readLock().unlock();
}
}
@Override
public void removeClient(DeviceId deviceId) {
- deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
- deviceLocks.get(deviceId).writeLock().lock();
+ deviceLocks.getUnchecked(deviceId).writeLock().lock();
try {
if (clients.containsKey(deviceId)) {
@@ -160,28 +168,26 @@
channelIds.remove(deviceId);
}
} finally {
- deviceLocks.get(deviceId).writeLock().unlock();
+ deviceLocks.getUnchecked(deviceId).writeLock().lock();
}
}
@Override
public boolean hasClient(DeviceId deviceId) {
- deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
- deviceLocks.get(deviceId).readLock().lock();
+ deviceLocks.getUnchecked(deviceId).readLock().lock();
try {
return clients.containsKey(deviceId);
} finally {
- deviceLocks.get(deviceId).readLock().unlock();
+ deviceLocks.getUnchecked(deviceId).readLock().unlock();
}
}
@Override
public boolean isReacheable(DeviceId deviceId) {
- deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
- deviceLocks.get(deviceId).readLock().lock();
+ deviceLocks.getUnchecked(deviceId).readLock().lock();
try {
if (!clients.containsKey(deviceId)) {
@@ -191,7 +197,7 @@
return grpcController.isChannelOpen(channelIds.get(deviceId));
} finally {
- deviceLocks.get(deviceId).readLock().unlock();
+ deviceLocks.getUnchecked(deviceId).readLock().unlock();
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PipeconfHelper.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PipeconfHelper.java
index af738e2..a49fdd8 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PipeconfHelper.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PipeconfHelper.java
@@ -16,6 +16,8 @@
package org.onosproject.p4runtime.ctl;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.TextFormat;
@@ -28,6 +30,8 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.P4_INFO_TEXT;
import static org.slf4j.LoggerFactory.getLogger;
@@ -37,10 +41,12 @@
*/
final class PipeconfHelper {
+ private static final int P4INFO_BROWSER_EXPIRE_TIME_IN_MIN = 10;
private static final Logger log = getLogger(PipeconfHelper.class);
- // TODO: consider implementing this via a cache that expires unused browsers.
- private static final Map<PiPipeconfId, P4InfoBrowser> BROWSERS = Maps.newConcurrentMap();
+ private static final Cache<PiPipeconfId, P4InfoBrowser> BROWSERS = CacheBuilder.newBuilder()
+ .expireAfterAccess(P4INFO_BROWSER_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
+ .build();
private static final Map<PiPipeconfId, P4Info> P4INFOS = Maps.newConcurrentMap();
private PipeconfHelper() {
@@ -83,13 +89,18 @@
* @return P4Info browser or null
*/
static P4InfoBrowser getP4InfoBrowser(PiPipeconf pipeconf) {
- return BROWSERS.computeIfAbsent(pipeconf.id(), (pipeconfId) -> {
- P4Info p4info = PipeconfHelper.getP4Info(pipeconf);
- if (p4info == null) {
- return null;
- } else {
- return new P4InfoBrowser(p4info);
- }
- });
+ try {
+ return BROWSERS.get(pipeconf.id(), () -> {
+ P4Info p4info = PipeconfHelper.getP4Info(pipeconf);
+ if (p4info == null) {
+ return null;
+ } else {
+ return new P4InfoBrowser(p4info);
+ }
+ });
+ } catch (ExecutionException e) {
+ log.error("Exception while accessing the P4InfoBrowser cache", e);
+ return null;
+ }
}
}