Add support for enabling/disabling ports for gNMI devices
This change also includes:
- Refactoring of gNMI protocol+driver to take advantage of the recent
changes to the gRPC protocol subsystem (e.g. no more locking, start RPC
with timeouts, etc.).
- Fixed Stratum driver to work after GeneralDeviceProvider refactoring
- Updated bmv2.py to generate ChassisConfig for stratum_bmv2
- Fixed portstate command to use the same port name as in the store
Change-Id: I0dad3bc73e4b6d907b5cf6b7b9a2852943226be7
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
index d529529..7957006 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
@@ -28,11 +28,8 @@
/**
* Shutdowns the client by terminating any active RPC.
- *
- * @return a completable future to signal the completion of the shutdown
- * procedure
*/
- CompletableFuture<Void> shutdown();
+ void shutdown();
/**
* This method provides a coarse modelling of gRPC channel {@link
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index 117c6e3..befa334 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -26,18 +26,10 @@
import org.onosproject.net.device.DeviceAgentEvent;
import org.slf4j.Logger;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onlab.util.Tools.groupedThreads;
+import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -45,18 +37,11 @@
*/
public abstract class AbstractGrpcClient implements GrpcClient {
- // Timeout in seconds to obtain the request lock.
- private static final int LOCK_TIMEOUT = 60;
- private static final int DEFAULT_THREAD_POOL_SIZE = 10;
-
protected final Logger log = getLogger(getClass());
- private final Lock requestLock = new ReentrantLock();
private final Context.CancellableContext cancellableContext =
Context.current().withCancellation();
- private final Executor contextExecutor;
- protected final ExecutorService executorService;
protected final DeviceId deviceId;
protected final ManagedChannel channel;
private final boolean persistent;
@@ -81,9 +66,6 @@
this.channel = channel;
this.persistent = persistent;
this.controller = controller;
- this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
- "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
- this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
}
@@ -106,27 +88,15 @@
}
@Override
- public CompletableFuture<Void> shutdown() {
+ public void shutdown() {
if (cancellableContext.isCancelled()) {
log.warn("Context is already cancelled, " +
"ignoring request to shutdown for {}...", deviceId);
- return CompletableFuture.completedFuture(null);
+ return;
}
- return CompletableFuture.supplyAsync(this::doShutdown);
- }
-
- protected Void doShutdown() {
log.warn("Shutting down client for {}...", deviceId);
cancellableContext.cancel(new InterruptedException(
"Requested client shutdown"));
- this.executorService.shutdownNow();
- try {
- executorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Executor service didn't shutdown in time.");
- Thread.currentThread().interrupt();
- }
- return null;
}
/**
@@ -152,66 +122,22 @@
return cancellableContext;
}
- /**
- * Equivalent of supplyWithExecutor using the gRPC context executor of this
- * client, such that if the context is cancelled (e.g. client shutdown) the
- * RPC is automatically cancelled.
- *
- * @param <U> return type of supplier
- * @param supplier the supplier to be executed
- * @param opDescription the description of this supplier
- * @return CompletableFuture includes the result of supplier
- * @throws IllegalStateException if client has been shut down
- */
- protected <U> CompletableFuture<U> supplyInContext(
- Supplier<U> supplier, String opDescription) {
- return supplyWithExecutor(supplier, opDescription, contextExecutor);
- }
-
- /**
- * Submits a task for async execution via the given executor. All tasks
- * submitted with this method will be executed sequentially.
- *
- * @param <U> return type of supplier
- * @param supplier the supplier to be executed
- * @param opDescription the description of this supplier
- * @param executor the executor to execute this supplier
- * @return CompletableFuture includes the result of supplier
- * @throws IllegalStateException if client has been shut down
- */
- private <U> CompletableFuture<U> supplyWithExecutor(
- Supplier<U> supplier, String opDescription, Executor executor) {
- if (this.cancellableContext.isCancelled()) {
- throw new IllegalStateException("Client has been shut down");
+ protected void handleRpcError(Throwable throwable, String opDescription) {
+ if (throwable instanceof StatusRuntimeException) {
+ final StatusRuntimeException sre = (StatusRuntimeException) throwable;
+ final String logMsg;
+ if (sre.getCause() == null) {
+ logMsg = sre.getMessage();
+ } else {
+ logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
+ }
+ log.warn("Error while performing {} on {}: {}",
+ opDescription, deviceId, logMsg);
+ log.debug("", throwable);
+ return;
}
- return CompletableFuture.supplyAsync(() -> {
- // TODO: explore a more relaxed locking strategy.
- try {
- if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
- log.error("LOCK TIMEOUT! This is likely a deadlock, "
- + "please debug (executing {})",
- opDescription);
- throw new IllegalThreadStateException("Lock timeout");
- }
- } catch (InterruptedException e) {
- log.warn("Thread interrupted while waiting for lock (executing {})",
- opDescription);
- throw new IllegalStateException(e);
- }
- try {
- return supplier.get();
- } catch (StatusRuntimeException ex) {
- log.warn("Unable to execute {} on {}: {}",
- opDescription, deviceId, ex.toString());
- throw ex;
- } catch (Throwable ex) {
- log.error("Exception in client of {}, executing {}",
- deviceId, opDescription, ex);
- throw ex;
- } finally {
- requestLock.unlock();
- }
- }, executor);
+ log.error(format("Exception while performing %s on %s",
+ opDescription, deviceId), throwable);
}
private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,