Add link eviction to gRPC Link SB.

- Now assumes (remote) LinkProvider to periodically report existing Link.
- Note: This eviction mechanism can be removed, once gRPC Link SB service was
  remodelled using streaming RPC.

Change-Id: I98f05f849b876cff9bbdb648e7ac79f900f4bfcb
diff --git a/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
index 62bdfb2..a44d539 100644
--- a/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
+++ b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
@@ -16,6 +16,7 @@
 package org.onosproject.incubator.rpc.grpc;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static java.util.stream.Collectors.toList;
 import static org.onosproject.incubator.protobuf.net.ProtobufUtils.translate;
 import static org.onosproject.net.DeviceId.deviceId;
@@ -25,6 +26,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -36,6 +38,7 @@
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.util.Tools;
 import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc;
 import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
 import org.onosproject.grpc.net.device.DeviceService.DeviceConnected;
@@ -110,8 +113,11 @@
     private final Map<String, LinkProviderService> linkProviderServices = Maps.newConcurrentMap();
     private final Map<String, LinkProvider> linkProviders = Maps.newConcurrentMap();
 
+    private ScheduledExecutorService executor;
+
     @Activate
     protected void activate(ComponentContext context) throws IOException {
+        executor = newScheduledThreadPool(1, Tools.groupedThreads("grpc", "%d", log));
         modified(context);
 
         log.debug("Server starting on {}", listenPort);
@@ -130,6 +136,12 @@
 
     @Deactivate
     protected void deactivate() {
+        executor.shutdown();
+        try {
+            executor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
 
         registeredProviders.stream()
             .forEach(deviceProviderRegistry::unregister);
@@ -182,6 +194,10 @@
         return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider);
     }
 
+    protected ScheduledExecutorService getSharedExecutor() {
+        return executor;
+    }
+
     // RPC Server-side code
     // RPC session Factory
     /**
diff --git a/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/LinkProviderServiceServerProxy.java b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/LinkProviderServiceServerProxy.java
index 608f6e3..435c650 100644
--- a/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/LinkProviderServiceServerProxy.java
+++ b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/LinkProviderServiceServerProxy.java
@@ -17,8 +17,14 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.cache.RemovalListeners.asynchronous;
 import static org.onosproject.net.DeviceId.deviceId;
+import static org.onosproject.net.LinkKey.linkKey;
 
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
 import org.onosproject.grpc.net.Link.ConnectPoint.ElementIdCase;
 import org.onosproject.grpc.net.Link.LinkType;
 import org.onosproject.grpc.net.link.LinkProviderServiceRpcGrpc.LinkProviderServiceRpc;
@@ -29,6 +35,7 @@
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Link;
+import org.onosproject.net.LinkKey;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.SparseAnnotations;
 import org.onosproject.net.link.DefaultLinkDescription;
@@ -38,9 +45,13 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.api.client.repackaged.com.google.common.annotations.Beta;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
 
 import io.grpc.stub.StreamObserver;
 
+// Only single instance will be created and bound to gRPC LinkProviderService
 /**
  * Server-side implementation of gRPC version of LinkProviderService.
  */
@@ -48,15 +59,39 @@
 final class LinkProviderServiceServerProxy
         implements LinkProviderServiceRpc {
 
+    /**
+     * Silence time in seconds, until link gets treated as vanished.
+     */
+    private static final int EVICT_LIMIT = 3 * 3;
+
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final GrpcRemoteServiceServer server;
 
     // TODO implement aging mechanism to automatically remove
     // stale links reported by dead client, etc.
+    /**
+     * Evicting Cache to track last seen time.
+     */
+    private final Cache<Pair<String, LinkKey>, LinkDescription> lastSeen;
 
     LinkProviderServiceServerProxy(GrpcRemoteServiceServer server) {
         this.server = checkNotNull(server);
+        ScheduledExecutorService executor = server.getSharedExecutor();
+        lastSeen = CacheBuilder.newBuilder()
+                .expireAfterWrite(EVICT_LIMIT, TimeUnit.SECONDS)
+                .removalListener(asynchronous(this::onRemove, executor))
+                .build();
+
+        executor.scheduleWithFixedDelay(lastSeen::cleanUp,
+                                        EVICT_LIMIT, EVICT_LIMIT, TimeUnit.SECONDS);
+    }
+
+    private void onRemove(RemovalNotification<Pair<String, LinkKey>, LinkDescription> n) {
+        if (n.wasEvicted()) {
+            getLinkProviderServiceFor(n.getKey().getLeft())
+                .linkVanished(n.getValue());
+        }
     }
 
     /**
@@ -92,6 +127,7 @@
 
         LinkDescription linkDescription = translate(request.getLinkDescription());
         linkProviderService.linkDetected(linkDescription);
+        lastSeen.put(Pair.of(scheme, linkKey(linkDescription)), linkDescription);
     }
 
     @Override
@@ -123,6 +159,7 @@
         case LINK_DESCRIPTION:
             LinkDescription desc = translate(request.getLinkDescription());
             getLinkProviderServiceFor(scheme).linkVanished(desc);
+            lastSeen.invalidate(Pair.of(scheme, linkKey(desc)));
             break;
         case SUBJECT_NOT_SET:
         default: