Add link eviction to gRPC Link SB.
- Now assumes (remote) LinkProvider to periodically report existing Link.
- Note: This eviction mechanism can be removed, once gRPC Link SB service was
remodelled using streaming RPC.
Change-Id: I98f05f849b876cff9bbdb648e7ac79f900f4bfcb
diff --git a/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
index 62bdfb2..a44d539 100644
--- a/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
+++ b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java
@@ -16,6 +16,7 @@
package org.onosproject.incubator.rpc.grpc;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.stream.Collectors.toList;
import static org.onosproject.incubator.protobuf.net.ProtobufUtils.translate;
import static org.onosproject.net.DeviceId.deviceId;
@@ -25,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -36,6 +38,7 @@
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.util.Tools;
import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc;
import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
import org.onosproject.grpc.net.device.DeviceService.DeviceConnected;
@@ -110,8 +113,11 @@
private final Map<String, LinkProviderService> linkProviderServices = Maps.newConcurrentMap();
private final Map<String, LinkProvider> linkProviders = Maps.newConcurrentMap();
+ private ScheduledExecutorService executor;
+
@Activate
protected void activate(ComponentContext context) throws IOException {
+ executor = newScheduledThreadPool(1, Tools.groupedThreads("grpc", "%d", log));
modified(context);
log.debug("Server starting on {}", listenPort);
@@ -130,6 +136,12 @@
@Deactivate
protected void deactivate() {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
registeredProviders.stream()
.forEach(deviceProviderRegistry::unregister);
@@ -182,6 +194,10 @@
return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider);
}
+ protected ScheduledExecutorService getSharedExecutor() {
+ return executor;
+ }
+
// RPC Server-side code
// RPC session Factory
/**