Add support for enabling/disabling ports for gNMI devices
This change also includes:
- Refactoring of gNMI protocol+driver to take advantage of the recent
changes to the gRPC protocol subsystem (e.g. no more locking, start RPC
with timeouts, etc.).
- Fixed Stratum driver to work after GeneralDeviceProvider refactoring
- Updated bmv2.py to generate ChassisConfig for stratum_bmv2
- Fixed portstate command to use the same port name as in the store
Change-Id: I0dad3bc73e4b6d907b5cf6b7b9a2852943226be7
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
index 796ef55..5d14bbc 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
@@ -38,7 +38,7 @@
*
* @return the capability response
*/
- CompletableFuture<CapabilityResponse> capability();
+ CompletableFuture<CapabilityResponse> capabilities();
/**
* Retrieves a snapshot of data from the device.
@@ -57,15 +57,17 @@
CompletableFuture<SetResponse> set(SetRequest request);
/**
- * Subscribes to a given specific gNMI path.
+ * Starts a subscription for the given request. Updates will be notified by
+ * the controller via {@link GnmiEvent.Type#UPDATE} events. The client
+ * guarantees that a Subscription RPC is active at all times despite channel
+ * or server failures, unless {@link #unsubscribe()} is called.
*
* @param request the subscribe request
- * @return true if subscribe successfully; false otherwise
*/
- boolean subscribe(SubscribeRequest request);
+ void subscribe(SubscribeRequest request);
/**
- * Terminates the subscription channel of this device.
+ * Terminates any Subscribe RPC active.
*/
- void terminateSubscriptionChannel();
+ void unsubscribe();
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index 6645f39..8d93a63 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -28,107 +28,157 @@
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiClientKey;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
-import org.slf4j.Logger;
import java.util.concurrent.CompletableFuture;
-
-import static org.slf4j.LoggerFactory.getLogger;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
/**
* Implementation of gNMI client.
*/
public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
- private static final PathElem DUMMY_PATH_ELEM = PathElem.newBuilder().setName("onos-gnmi-test").build();
- private static final Path DUMMY_PATH = Path.newBuilder().addElem(DUMMY_PATH_ELEM).build();
- private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
- private final Logger log = getLogger(getClass());
- private final gNMIGrpc.gNMIBlockingStub blockingStub;
- private GnmiSubscriptionManager gnmiSubscriptionManager;
+
+ private static final int RPC_TIMEOUT_SECONDS = 10;
+
+ private static final GetRequest PING_REQUEST = GetRequest.newBuilder().addPath(
+ Path.newBuilder().addElem(
+ PathElem.newBuilder().setName("onos-gnmi-ping").build()
+ ).build()).build();
+
+ private GnmiSubscriptionManager subscribeManager;
GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
super(clientKey, managedChannel, false, controller);
- this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
- this.gnmiSubscriptionManager =
- new GnmiSubscriptionManager(managedChannel, deviceId, controller);
+ this.subscribeManager =
+ new GnmiSubscriptionManager(this, deviceId, controller);
}
@Override
- public CompletableFuture<CapabilityResponse> capability() {
- return supplyInContext(this::doCapability, "capability");
+ public CompletableFuture<CapabilityResponse> capabilities() {
+ final CompletableFuture<CapabilityResponse> future = new CompletableFuture<>();
+ execRpc(s -> s.capabilities(
+ CapabilityRequest.getDefaultInstance(),
+ unaryObserver(future, CapabilityResponse.getDefaultInstance(),
+ "capabilities request"))
+ );
+ return future;
}
@Override
public CompletableFuture<GetResponse> get(GetRequest request) {
- return supplyInContext(() -> doGet(request), "get");
+ final CompletableFuture<GetResponse> future = new CompletableFuture<>();
+ execRpc(s -> s.get(request, unaryObserver(
+ future, GetResponse.getDefaultInstance(), "GET"))
+ );
+ return future;
}
@Override
public CompletableFuture<SetResponse> set(SetRequest request) {
- return supplyInContext(() -> doSet(request), "set");
+ final CompletableFuture<SetResponse> future = new CompletableFuture<>();
+ execRpc(s -> s.set(request, unaryObserver(
+ future, SetResponse.getDefaultInstance(), "SET"))
+ );
+ return future;
+ }
+
+ private <RES> StreamObserver<RES> unaryObserver(
+ final CompletableFuture<RES> future,
+ final RES defaultResponse,
+ final String opDescription) {
+ return new StreamObserver<RES>() {
+ @Override
+ public void onNext(RES value) {
+ future.complete(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ handleRpcError(t, opDescription);
+ future.complete(defaultResponse);
+ }
+
+ @Override
+ public void onCompleted() {
+ // Ignore. Unary call.
+ }
+ };
}
@Override
- public boolean subscribe(SubscribeRequest request) {
- return gnmiSubscriptionManager.subscribe(request);
+ public void subscribe(SubscribeRequest request) {
+ subscribeManager.subscribe(request);
}
@Override
- public void terminateSubscriptionChannel() {
- gnmiSubscriptionManager.complete();
+ public void unsubscribe() {
+ subscribeManager.unsubscribe();
}
@Override
public CompletableFuture<Boolean> probeService() {
- return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
+ final CompletableFuture<Boolean> future = new CompletableFuture<>();
+ final StreamObserver<GetResponse> responseObserver = new StreamObserver<GetResponse>() {
+ @Override
+ public void onNext(GetResponse value) {
+ future.complete(true);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // This gRPC call should throw INVALID_ARGUMENT status exception
+ // since "/onos-gnmi-ping" path does not exists in any config
+ // model For other status code such as UNIMPLEMENT, means the
+ // gNMI service is not available on the device.
+ future.complete(Status.fromThrowable(t).getCode()
+ == Status.Code.INVALID_ARGUMENT);
+ }
+
+ @Override
+ public void onCompleted() {
+ // Ignore. Unary call.
+ }
+ };
+ execRpc(s -> s.get(PING_REQUEST, responseObserver));
+ return future;
}
@Override
- protected Void doShutdown() {
- gnmiSubscriptionManager.shutdown();
- return super.doShutdown();
+ public void shutdown() {
+ subscribeManager.shutdown();
+ super.shutdown();
}
- private CapabilityResponse doCapability() {
- CapabilityRequest request = CapabilityRequest.newBuilder().build();
- try {
- return blockingStub.capabilities(request);
- } catch (StatusRuntimeException e) {
- log.warn("Unable to get capability from {}: {}", deviceId, e.getMessage());
- return CapabilityResponse.getDefaultInstance();
+ /**
+ * Forces execution of an RPC in a cancellable context with a timeout.
+ *
+ * @param stubConsumer P4Runtime stub consumer
+ */
+ private void execRpc(Consumer<gNMIGrpc.gNMIStub> stubConsumer) {
+ if (log.isTraceEnabled()) {
+ log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
+ RPC_TIMEOUT_SECONDS, context().getDeadline());
}
+ runInCancellableContext(() -> stubConsumer.accept(
+ gNMIGrpc.newStub(channel)
+ .withDeadlineAfter(RPC_TIMEOUT_SECONDS, TimeUnit.SECONDS)));
}
- private GetResponse doGet(GetRequest request) {
- try {
- return blockingStub.get(request);
- } catch (StatusRuntimeException e) {
- log.warn("Unable to get data from {}: {}", deviceId, e.getMessage());
- return GetResponse.getDefaultInstance();
+ /**
+ * Forces execution of an RPC in a cancellable context with no timeout.
+ *
+ * @param stubConsumer P4Runtime stub consumer
+ */
+ void execRpcNoTimeout(Consumer<gNMIGrpc.gNMIStub> stubConsumer) {
+ if (log.isTraceEnabled()) {
+ log.trace("Executing RPC with no timeout (context deadline {})...",
+ context().getDeadline());
}
- }
-
- private SetResponse doSet(SetRequest request) {
- try {
- return blockingStub.set(request);
- } catch (StatusRuntimeException e) {
- log.warn("Unable to set data to {}: {}", deviceId, e.getMessage());
- return SetResponse.getDefaultInstance();
- }
- }
-
- private boolean doServiceAvailable() {
- try {
- return blockingStub.get(DUMMY_REQUEST) != null;
- } catch (StatusRuntimeException e) {
- // This gRPC call should throw INVALID_ARGUMENT status exception
- // since "/onos-gnmi-test" path does not exists in any config model
- // For other status code such as UNIMPLEMENT, means the gNMI
- // service is not available on the device.
- return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
- }
+ runInCancellableContext(() -> stubConsumer.accept(
+ gNMIGrpc.newStub(channel)));
}
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
index 156cb43..73bb807 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
@@ -17,9 +17,8 @@
package org.onosproject.gnmi.ctl;
+import com.google.common.util.concurrent.Futures;
import gnmi.Gnmi;
-import gnmi.gNMIGrpc;
-import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
@@ -29,9 +28,10 @@
import org.slf4j.Logger;
import java.net.ConnectException;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -39,108 +39,110 @@
import static org.slf4j.LoggerFactory.getLogger;
/**
- * A manager for the gNMI stream channel that opportunistically creates
- * new stream RCP stubs (e.g. when one fails because of errors) and posts
- * subscribe events via the gNMI controller.
+ * A manager for the gNMI Subscribe RPC that opportunistically starts new RPC
+ * (e.g. when one fails because of errors) and posts subscribe events via the
+ * gNMI controller.
*/
final class GnmiSubscriptionManager {
- /**
- * The state of the subscription manager.
- */
- enum State {
-
- /**
- * Subscription not exists.
- */
- INIT,
-
- /**
- * Exists a subscription and channel opened.
- */
- SUBSCRIBED,
-
- /**
- * Exists a subscription, but the channel does not open.
- */
- RETRYING,
- }
-
// FIXME: make this configurable
private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds
+
private static final Logger log = getLogger(GnmiSubscriptionManager.class);
- private final ManagedChannel channel;
+
+ private final GnmiClientImpl client;
private final DeviceId deviceId;
private final GnmiControllerImpl controller;
-
private final StreamObserver<Gnmi.SubscribeResponse> responseObserver;
- private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
+
+ private final ScheduledExecutorService streamCheckerExecutor =
+ newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-subscribe-check", "%d", log));
+ private Future<?> checkTask;
private ClientCallStreamObserver<Gnmi.SubscribeRequest> requestObserver;
private Gnmi.SubscribeRequest existingSubscription;
- private final ScheduledExecutorService streamCheckerExecutor =
- newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-probe", "%d", log));
+ private AtomicBoolean active = new AtomicBoolean(false);
- GnmiSubscriptionManager(ManagedChannel channel, DeviceId deviceId,
+ GnmiSubscriptionManager(GnmiClientImpl client, DeviceId deviceId,
GnmiControllerImpl controller) {
- this.channel = channel;
+ this.client = client;
this.deviceId = deviceId;
this.controller = controller;
this.responseObserver = new InternalStreamResponseObserver();
- streamCheckerExecutor.scheduleAtFixedRate(this::checkGnmiStream, 0,
- DEFAULT_RECONNECT_DELAY,
- TimeUnit.SECONDS);
+ }
+
+ void subscribe(Gnmi.SubscribeRequest request) {
+ synchronized (this) {
+ if (existingSubscription != null) {
+ if (existingSubscription.equals(request)) {
+ // Nothing to do. We are already subscribed for the same
+ // request.
+ log.debug("Ignoring re-subscription to same request",
+ deviceId);
+ return;
+ }
+ log.debug("Cancelling existing subscription for {} before " +
+ "starting a new one", deviceId);
+ complete();
+ }
+ existingSubscription = request;
+ sendSubscribeRequest();
+ if (checkTask != null) {
+ checkTask = streamCheckerExecutor.scheduleAtFixedRate(
+ this::checkSubscription, 0,
+ DEFAULT_RECONNECT_DELAY,
+ TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ void unsubscribe() {
+ synchronized (this) {
+ if (checkTask != null) {
+ checkTask.cancel(false);
+ checkTask = null;
+ }
+ existingSubscription = null;
+ complete();
+ }
}
public void shutdown() {
- log.info("gNMI subscription manager for device {} shutdown", deviceId);
- streamCheckerExecutor.shutdown();
- complete();
+ log.debug("Shutting down gNMI subscription manager for {}", deviceId);
+ unsubscribe();
+ streamCheckerExecutor.shutdownNow();
}
- private void initIfRequired() {
+ private void checkSubscription() {
+ synchronized (this) {
+ if (existingSubscription != null && !active.get()) {
+ if (client.isServerReachable() || Futures.getUnchecked(client.probeService())) {
+ log.info("Re-starting Subscribe RPC for {}...", deviceId);
+ sendSubscribeRequest();
+ } else {
+ log.debug("Not restarting Subscribe RPC for {}, server is NOT reachable",
+ deviceId);
+ }
+ }
+ }
+ }
+
+ private void sendSubscribeRequest() {
if (requestObserver == null) {
- log.debug("Creating new stream channel for {}...", deviceId);
- requestObserver = (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
- gNMIGrpc.newStub(channel).subscribe(responseObserver);
-
+ log.debug("Starting new Subscribe RPC for {}...", deviceId);
+ client.execRpcNoTimeout(
+ s -> requestObserver =
+ (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
+ s.subscribe(responseObserver)
+ );
}
- }
-
- boolean subscribe(Gnmi.SubscribeRequest request) {
- synchronized (state) {
- if (state.get() == State.SUBSCRIBED) {
- // Cancel subscription when we need to subscribe new thing
- complete();
- }
-
- existingSubscription = request;
- return send(request);
- }
- }
-
- private boolean send(Gnmi.SubscribeRequest value) {
- initIfRequired();
- try {
- requestObserver.onNext(value);
- state.set(State.SUBSCRIBED);
- return true;
- } catch (Throwable ex) {
- if (ex instanceof StatusRuntimeException) {
- log.warn("Unable to send subscribe request to {}: {}",
- deviceId, ex.getMessage());
- } else {
- log.warn("Exception while sending subscribe request to {}",
- deviceId, ex);
- }
- state.set(State.RETRYING);
- return false;
- }
+ requestObserver.onNext(existingSubscription);
+ active.set(true);
}
public void complete() {
- synchronized (state) {
- state.set(State.INIT);
+ synchronized (this) {
+ active.set(false);
if (requestObserver != null) {
requestObserver.onCompleted();
requestObserver.cancel("Terminated", null);
@@ -149,21 +151,8 @@
}
}
- private void checkGnmiStream() {
- synchronized (state) {
- if (state.get() != State.RETRYING) {
- // No need to retry if the state is not RETRYING
- return;
- }
- log.info("Try reconnecting gNMI stream to device {}", deviceId);
-
- complete();
- send(existingSubscription);
- }
- }
-
/**
- * Handles messages received from the device on the stream channel.
+ * Handles messages received from the device on the Subscribe RPC.
*/
private final class InternalStreamResponseObserver
implements StreamObserver<Gnmi.SubscribeResponse> {
@@ -171,41 +160,51 @@
@Override
public void onNext(Gnmi.SubscribeResponse message) {
try {
- log.debug("Received message on stream channel from {}: {}",
- deviceId, message.toString());
- GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
- GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
- controller.postEvent(event);
+ if (log.isTraceEnabled()) {
+ log.trace("Received SubscribeResponse from {}: {}",
+ deviceId, message.toString());
+ }
+ controller.postEvent(new GnmiEvent(GnmiEvent.Type.UPDATE, new GnmiUpdate(
+ deviceId, message.getUpdate(), message.getSyncResponse())));
} catch (Throwable ex) {
- log.error("Exception while processing stream message from {}",
- deviceId, ex);
+ log.error("Exception processing SubscribeResponse from " + deviceId,
+ ex);
}
}
@Override
public void onError(Throwable throwable) {
+ complete();
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) throwable;
if (sre.getStatus().getCause() instanceof ConnectException) {
- log.warn("Device {} is unreachable ({})",
- deviceId, sre.getCause().getMessage());
+ log.warn("{} is unreachable ({})",
+ deviceId, sre.getCause().getMessage());
} else {
- log.warn("Received error on stream channel for {}: {}",
- deviceId, throwable.getMessage());
+ log.warn("Error on Subscribe RPC for {}: {}",
+ deviceId, throwable.getMessage());
}
} else {
- log.warn(format("Received exception on stream channel for %s",
- deviceId), throwable);
+ log.error(format("Exception on Subscribe RPC for %s",
+ deviceId), throwable);
}
- state.set(State.RETRYING);
}
@Override
public void onCompleted() {
- log.warn("Stream channel for {} has completed", deviceId);
- state.set(State.RETRYING);
+ complete();
+ log.warn("Subscribe RPC for {} has completed", deviceId);
}
}
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (!streamCheckerExecutor.isShutdown()) {
+ log.error("Finalizing object but executor is still active! BUG? Shutting down...");
+ shutdown();
+ }
+ super.finalize();
+ }
}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
index d529529..7957006 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
@@ -28,11 +28,8 @@
/**
* Shutdowns the client by terminating any active RPC.
- *
- * @return a completable future to signal the completion of the shutdown
- * procedure
*/
- CompletableFuture<Void> shutdown();
+ void shutdown();
/**
* This method provides a coarse modelling of gRPC channel {@link
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index 117c6e3..befa334 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -26,18 +26,10 @@
import org.onosproject.net.device.DeviceAgentEvent;
import org.slf4j.Logger;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onlab.util.Tools.groupedThreads;
+import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -45,18 +37,11 @@
*/
public abstract class AbstractGrpcClient implements GrpcClient {
- // Timeout in seconds to obtain the request lock.
- private static final int LOCK_TIMEOUT = 60;
- private static final int DEFAULT_THREAD_POOL_SIZE = 10;
-
protected final Logger log = getLogger(getClass());
- private final Lock requestLock = new ReentrantLock();
private final Context.CancellableContext cancellableContext =
Context.current().withCancellation();
- private final Executor contextExecutor;
- protected final ExecutorService executorService;
protected final DeviceId deviceId;
protected final ManagedChannel channel;
private final boolean persistent;
@@ -81,9 +66,6 @@
this.channel = channel;
this.persistent = persistent;
this.controller = controller;
- this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
- "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
- this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
}
@@ -106,27 +88,15 @@
}
@Override
- public CompletableFuture<Void> shutdown() {
+ public void shutdown() {
if (cancellableContext.isCancelled()) {
log.warn("Context is already cancelled, " +
"ignoring request to shutdown for {}...", deviceId);
- return CompletableFuture.completedFuture(null);
+ return;
}
- return CompletableFuture.supplyAsync(this::doShutdown);
- }
-
- protected Void doShutdown() {
log.warn("Shutting down client for {}...", deviceId);
cancellableContext.cancel(new InterruptedException(
"Requested client shutdown"));
- this.executorService.shutdownNow();
- try {
- executorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Executor service didn't shutdown in time.");
- Thread.currentThread().interrupt();
- }
- return null;
}
/**
@@ -152,66 +122,22 @@
return cancellableContext;
}
- /**
- * Equivalent of supplyWithExecutor using the gRPC context executor of this
- * client, such that if the context is cancelled (e.g. client shutdown) the
- * RPC is automatically cancelled.
- *
- * @param <U> return type of supplier
- * @param supplier the supplier to be executed
- * @param opDescription the description of this supplier
- * @return CompletableFuture includes the result of supplier
- * @throws IllegalStateException if client has been shut down
- */
- protected <U> CompletableFuture<U> supplyInContext(
- Supplier<U> supplier, String opDescription) {
- return supplyWithExecutor(supplier, opDescription, contextExecutor);
- }
-
- /**
- * Submits a task for async execution via the given executor. All tasks
- * submitted with this method will be executed sequentially.
- *
- * @param <U> return type of supplier
- * @param supplier the supplier to be executed
- * @param opDescription the description of this supplier
- * @param executor the executor to execute this supplier
- * @return CompletableFuture includes the result of supplier
- * @throws IllegalStateException if client has been shut down
- */
- private <U> CompletableFuture<U> supplyWithExecutor(
- Supplier<U> supplier, String opDescription, Executor executor) {
- if (this.cancellableContext.isCancelled()) {
- throw new IllegalStateException("Client has been shut down");
+ protected void handleRpcError(Throwable throwable, String opDescription) {
+ if (throwable instanceof StatusRuntimeException) {
+ final StatusRuntimeException sre = (StatusRuntimeException) throwable;
+ final String logMsg;
+ if (sre.getCause() == null) {
+ logMsg = sre.getMessage();
+ } else {
+ logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
+ }
+ log.warn("Error while performing {} on {}: {}",
+ opDescription, deviceId, logMsg);
+ log.debug("", throwable);
+ return;
}
- return CompletableFuture.supplyAsync(() -> {
- // TODO: explore a more relaxed locking strategy.
- try {
- if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
- log.error("LOCK TIMEOUT! This is likely a deadlock, "
- + "please debug (executing {})",
- opDescription);
- throw new IllegalThreadStateException("Lock timeout");
- }
- } catch (InterruptedException e) {
- log.warn("Thread interrupted while waiting for lock (executing {})",
- opDescription);
- throw new IllegalStateException(e);
- }
- try {
- return supplier.get();
- } catch (StatusRuntimeException ex) {
- log.warn("Unable to execute {} on {}: {}",
- opDescription, deviceId, ex.toString());
- throw ex;
- } catch (Throwable ex) {
- log.error("Exception in client of {}, executing {}",
- deviceId, opDescription, ex);
- throw ex;
- } finally {
- requestLock.unlock();
- }
- }, executor);
+ log.error(format("Exception while performing %s on %s",
+ opDescription, deviceId), throwable);
}
private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
index d06e3fc..d54ef97 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
@@ -42,7 +42,6 @@
import java.util.function.Consumer;
import static com.google.common.base.Preconditions.checkNotNull;
-import static java.lang.String.format;
import static p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest.ResponseType.COOKIE_ONLY;
/**
@@ -96,9 +95,9 @@
}
@Override
- protected Void doShutdown() {
+ public void shutdown() {
streamClient.closeSession();
- return super.doShutdown();
+ super.shutdown();
}
@Override
@@ -191,6 +190,22 @@
return future;
}
+ @Override
+ protected void handleRpcError(Throwable throwable, String opDescription) {
+ if (throwable instanceof StatusRuntimeException) {
+ checkGrpcException((StatusRuntimeException) throwable);
+ }
+ super.handleRpcError(throwable, opDescription);
+ }
+
+ private void checkGrpcException(StatusRuntimeException sre) {
+ if (sre.getStatus().getCode() == Status.Code.PERMISSION_DENIED) {
+ // Notify upper layers that this node is not master.
+ controller.postEvent(new DeviceAgentEvent(
+ DeviceAgentEvent.Type.NOT_MASTER, deviceId));
+ }
+ }
+
/**
* Returns the P4Runtime-internal device ID associated with this client.
*
@@ -252,38 +267,4 @@
runInCancellableContext(() -> stubConsumer.accept(
P4RuntimeGrpc.newStub(channel)));
}
-
- /**
- * Logs the error and checks it for any condition that might be of interest
- * for the controller.
- *
- * @param throwable throwable
- * @param opDescription operation description for logging
- */
- void handleRpcError(Throwable throwable, String opDescription) {
- if (throwable instanceof StatusRuntimeException) {
- final StatusRuntimeException sre = (StatusRuntimeException) throwable;
- checkGrpcException(sre);
- final String logMsg;
- if (sre.getCause() == null) {
- logMsg = sre.getMessage();
- } else {
- logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
- }
- log.warn("Error while performing {} on {}: {}",
- opDescription, deviceId, logMsg);
- log.debug("", throwable);
- return;
- }
- log.error(format("Exception while performing %s on %s",
- opDescription, deviceId), throwable);
- }
-
- private void checkGrpcException(StatusRuntimeException sre) {
- if (sre.getStatus().getCode() == Status.Code.PERMISSION_DENIED) {
- // Notify upper layers that this node is not master.
- controller.postEvent(new DeviceAgentEvent(
- DeviceAgentEvent.Type.NOT_MASTER, deviceId));
- }
- }
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
index 1bf195c..df4844b 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -303,25 +303,10 @@
void send(StreamMessageRequest value) {
synchronized (this) {
initIfRequired();
- doSend(value);
- }
- }
-
- private void doSend(StreamMessageRequest value) {
- try {
requestObserver.onNext(value);
// Optimistically set the session as open. In case of errors, it
// will be closed by the response stream observer.
streamChannelManager.signalOpen();
- } catch (Throwable ex) {
- if (ex instanceof StatusRuntimeException) {
- log.warn("Unable to send {} to {}: {}",
- value.getUpdateCase().toString(), deviceId, ex.getMessage());
- } else {
- log.error("Exception while sending {} to {}: {}",
- value.getUpdateCase().toString(), deviceId, ex);
- }
- teardown();
}
}