[ONOS-7334] Using Cache Loader to remove stale entries in p4runtime

Change-Id: Ieead6e199faf23f5fa316516adab659d3e192950
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index ca9392f..2126dfa 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -16,9 +16,11 @@
 
 package org.onosproject.drivers.p4runtime;
 
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import io.grpc.StatusRuntimeException;
 import org.onlab.util.SharedExecutors;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
@@ -43,12 +45,12 @@
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
@@ -90,11 +92,18 @@
     // FIXME: set to true as soon as the feature is implemented in P4Runtime.
     private boolean readAllDirectCounters = false;
 
-    // Needed to synchronize operations over the same table entry.
-    // FIXME: locks should be removed when unused (hint use cache with timeout)
-    private static final ConcurrentMap<PiTableEntryHandle, Lock>
-            ENTRY_LOCKS = Maps.newConcurrentMap();
+    private static final int TABLE_ENTRY_LOCK_EXPIRE_TIME_IN_MIN = 10;
 
+    // Needed to synchronize operations over the same table entry.
+    private static final LoadingCache<PiTableEntryHandle, Lock>
+            ENTRY_LOCKS = CacheBuilder.newBuilder()
+            .expireAfterAccess(TABLE_ENTRY_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
+            .build(new CacheLoader<PiTableEntryHandle, Lock>() {
+                @Override
+                public Lock load(PiTableEntryHandle handle) {
+                    return new ReentrantLock();
+                }
+            });
     private PiPipelineModel pipelineModel;
     private PiPipelineInterpreter interpreter;
     private P4RuntimeTableMirror tableMirror;
@@ -275,15 +284,14 @@
                     .of(deviceId, piEntryToApply);
 
             // Serialize operations over the same match key/table/device ID.
-            final Lock lock = ENTRY_LOCKS.computeIfAbsent(handle, k -> new ReentrantLock());
-            lock.lock();
+            ENTRY_LOCKS.getUnchecked(handle).lock();
             try {
                 if (applyEntry(handle, piEntryToApply,
                                ruleToApply, driverOperation)) {
                     result.add(ruleToApply);
                 }
             } finally {
-                lock.unlock();
+                ENTRY_LOCKS.getUnchecked(handle).unlock();
             }
         }
 
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 57703a1..36faa0a 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -16,6 +16,9 @@
 
 package org.onosproject.p4runtime.ctl;
 
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
 import com.google.common.collect.Maps;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -41,6 +44,7 @@
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -58,12 +62,20 @@
         implements P4RuntimeController {
 
     private static final String P4R_ELECTION = "p4runtime-election";
+    private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
     private final Logger log = getLogger(getClass());
     private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
     private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
-    // TODO: should use a cache to delete unused locks.
-    private final Map<DeviceId, ReadWriteLock> deviceLocks = Maps.newConcurrentMap();
+    private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
+            .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
+            .build(new CacheLoader<DeviceId, ReadWriteLock>() {
+                @Override
+                public ReadWriteLock load(DeviceId deviceId) {
+                    return new ReentrantReadWriteLock();
+                }
+            });
+
     private AtomicCounter electionIdGenerator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -94,9 +106,7 @@
         checkNotNull(deviceId);
         checkNotNull(channelBuilder);
 
-        deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
-        deviceLocks.get(deviceId).writeLock().lock();
-
+        deviceLocks.getUnchecked(deviceId).writeLock().lock();
         log.info("Creating client for {} (with internal device id {})...", deviceId, p4DeviceId);
 
         try {
@@ -106,7 +116,7 @@
                 return doCreateClient(deviceId, p4DeviceId, channelBuilder);
             }
         } finally {
-            deviceLocks.get(deviceId).writeLock().unlock();
+            deviceLocks.getUnchecked(deviceId).writeLock().unlock();
         }
     }
 
@@ -136,21 +146,19 @@
     @Override
     public P4RuntimeClient getClient(DeviceId deviceId) {
 
-        deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
-        deviceLocks.get(deviceId).readLock().lock();
+        deviceLocks.getUnchecked(deviceId).readLock().lock();
 
         try {
             return clients.get(deviceId);
         } finally {
-            deviceLocks.get(deviceId).readLock().unlock();
+            deviceLocks.getUnchecked(deviceId).readLock().unlock();
         }
     }
 
     @Override
     public void removeClient(DeviceId deviceId) {
 
-        deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
-        deviceLocks.get(deviceId).writeLock().lock();
+        deviceLocks.getUnchecked(deviceId).writeLock().lock();
 
         try {
             if (clients.containsKey(deviceId)) {
@@ -160,28 +168,26 @@
                 channelIds.remove(deviceId);
             }
         } finally {
-            deviceLocks.get(deviceId).writeLock().unlock();
+           deviceLocks.getUnchecked(deviceId).writeLock().lock();
         }
     }
 
     @Override
     public boolean hasClient(DeviceId deviceId) {
 
-        deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
-        deviceLocks.get(deviceId).readLock().lock();
+        deviceLocks.getUnchecked(deviceId).readLock().lock();
 
         try {
             return clients.containsKey(deviceId);
         } finally {
-            deviceLocks.get(deviceId).readLock().unlock();
+            deviceLocks.getUnchecked(deviceId).readLock().unlock();
         }
     }
 
     @Override
     public boolean isReacheable(DeviceId deviceId) {
 
-        deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
-        deviceLocks.get(deviceId).readLock().lock();
+        deviceLocks.getUnchecked(deviceId).readLock().lock();
 
         try {
             if (!clients.containsKey(deviceId)) {
@@ -191,7 +197,7 @@
 
             return grpcController.isChannelOpen(channelIds.get(deviceId));
         } finally {
-            deviceLocks.get(deviceId).readLock().unlock();
+            deviceLocks.getUnchecked(deviceId).readLock().unlock();
         }
     }
 
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PipeconfHelper.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PipeconfHelper.java
index af738e2..a49fdd8 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PipeconfHelper.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PipeconfHelper.java
@@ -16,6 +16,8 @@
 
 package org.onosproject.p4runtime.ctl;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Maps;
 import com.google.protobuf.ExtensionRegistry;
 import com.google.protobuf.TextFormat;
@@ -28,6 +30,8 @@
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.P4_INFO_TEXT;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -37,10 +41,12 @@
  */
 final class PipeconfHelper {
 
+    private static final int P4INFO_BROWSER_EXPIRE_TIME_IN_MIN = 10;
     private static final Logger log = getLogger(PipeconfHelper.class);
 
-    // TODO: consider implementing this via a cache that expires unused browsers.
-    private static final Map<PiPipeconfId, P4InfoBrowser> BROWSERS = Maps.newConcurrentMap();
+    private static final Cache<PiPipeconfId, P4InfoBrowser> BROWSERS = CacheBuilder.newBuilder()
+            .expireAfterAccess(P4INFO_BROWSER_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
+            .build();
     private static final Map<PiPipeconfId, P4Info> P4INFOS = Maps.newConcurrentMap();
 
     private PipeconfHelper() {
@@ -83,13 +89,18 @@
      * @return P4Info browser or null
      */
     static P4InfoBrowser getP4InfoBrowser(PiPipeconf pipeconf) {
-        return BROWSERS.computeIfAbsent(pipeconf.id(), (pipeconfId) -> {
-            P4Info p4info = PipeconfHelper.getP4Info(pipeconf);
-            if (p4info == null) {
-                return null;
-            } else {
-                return new P4InfoBrowser(p4info);
-            }
-        });
+        try {
+            return BROWSERS.get(pipeconf.id(), () -> {
+                P4Info p4info = PipeconfHelper.getP4Info(pipeconf);
+                if (p4info == null) {
+                    return null;
+                } else {
+                    return new P4InfoBrowser(p4info);
+                }
+            });
+        } catch (ExecutionException e) {
+            log.error("Exception while accessing the P4InfoBrowser cache", e);
+            return null;
+        }
     }
 }