Add support for enabling/disabling ports for gNMI devices

This change also includes:
- Refactoring of gNMI protocol+driver to take advantage of the recent
changes to the gRPC protocol subsystem (e.g. no more locking, start RPC
with timeouts, etc.).
- Fixed Stratum driver to work after GeneralDeviceProvider refactoring
- Updated bmv2.py to generate ChassisConfig for stratum_bmv2
- Fixed portstate command to use the same port name as in the store

Change-Id: I0dad3bc73e4b6d907b5cf6b7b9a2852943226be7
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
index 796ef55..5d14bbc 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
@@ -38,7 +38,7 @@
      *
      * @return the capability response
      */
-    CompletableFuture<CapabilityResponse> capability();
+    CompletableFuture<CapabilityResponse> capabilities();
 
     /**
      * Retrieves a snapshot of data from the device.
@@ -57,15 +57,17 @@
     CompletableFuture<SetResponse> set(SetRequest request);
 
     /**
-     * Subscribes to a given specific gNMI path.
+     * Starts a subscription for the given request. Updates will be notified by
+     * the controller via {@link GnmiEvent.Type#UPDATE} events. The client
+     * guarantees that a Subscription RPC is active at all times despite channel
+     * or server failures, unless {@link #unsubscribe()} is called.
      *
      * @param request the subscribe request
-     * @return true if subscribe successfully; false otherwise
      */
-    boolean subscribe(SubscribeRequest request);
+    void subscribe(SubscribeRequest request);
 
     /**
-     * Terminates the subscription channel of this device.
+     * Terminates any Subscribe RPC active.
      */
-    void terminateSubscriptionChannel();
+    void unsubscribe();
 }
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index 6645f39..8d93a63 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -28,107 +28,157 @@
 import gnmi.gNMIGrpc;
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
 import org.onosproject.gnmi.api.GnmiClient;
 import org.onosproject.gnmi.api.GnmiClientKey;
 import org.onosproject.grpc.ctl.AbstractGrpcClient;
-import org.slf4j.Logger;
 
 import java.util.concurrent.CompletableFuture;
-
-import static org.slf4j.LoggerFactory.getLogger;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 /**
  * Implementation of gNMI client.
  */
 public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
-    private static final PathElem DUMMY_PATH_ELEM = PathElem.newBuilder().setName("onos-gnmi-test").build();
-    private static final Path DUMMY_PATH = Path.newBuilder().addElem(DUMMY_PATH_ELEM).build();
-    private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
-    private final Logger log = getLogger(getClass());
-    private final gNMIGrpc.gNMIBlockingStub blockingStub;
-    private GnmiSubscriptionManager gnmiSubscriptionManager;
+
+    private static final int RPC_TIMEOUT_SECONDS = 10;
+
+    private static final GetRequest PING_REQUEST = GetRequest.newBuilder().addPath(
+            Path.newBuilder().addElem(
+                    PathElem.newBuilder().setName("onos-gnmi-ping").build()
+            ).build()).build();
+
+    private GnmiSubscriptionManager subscribeManager;
 
     GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
         super(clientKey, managedChannel, false, controller);
-        this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
-        this.gnmiSubscriptionManager =
-                new GnmiSubscriptionManager(managedChannel, deviceId, controller);
+        this.subscribeManager =
+                new GnmiSubscriptionManager(this, deviceId, controller);
     }
 
     @Override
-    public CompletableFuture<CapabilityResponse> capability() {
-        return supplyInContext(this::doCapability, "capability");
+    public CompletableFuture<CapabilityResponse> capabilities() {
+        final CompletableFuture<CapabilityResponse> future = new CompletableFuture<>();
+        execRpc(s -> s.capabilities(
+                CapabilityRequest.getDefaultInstance(),
+                unaryObserver(future, CapabilityResponse.getDefaultInstance(),
+                              "capabilities request"))
+        );
+        return future;
     }
 
     @Override
     public CompletableFuture<GetResponse> get(GetRequest request) {
-        return supplyInContext(() -> doGet(request), "get");
+        final CompletableFuture<GetResponse> future = new CompletableFuture<>();
+        execRpc(s -> s.get(request, unaryObserver(
+                future, GetResponse.getDefaultInstance(), "GET"))
+        );
+        return future;
     }
 
     @Override
     public CompletableFuture<SetResponse> set(SetRequest request) {
-        return supplyInContext(() -> doSet(request), "set");
+        final CompletableFuture<SetResponse> future = new CompletableFuture<>();
+        execRpc(s -> s.set(request, unaryObserver(
+                future, SetResponse.getDefaultInstance(), "SET"))
+        );
+        return future;
+    }
+
+    private <RES> StreamObserver<RES> unaryObserver(
+            final CompletableFuture<RES> future,
+            final RES defaultResponse,
+            final String opDescription) {
+        return new StreamObserver<RES>() {
+            @Override
+            public void onNext(RES value) {
+                future.complete(value);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                handleRpcError(t, opDescription);
+                future.complete(defaultResponse);
+            }
+
+            @Override
+            public void onCompleted() {
+                // Ignore. Unary call.
+            }
+        };
     }
 
     @Override
-    public boolean subscribe(SubscribeRequest request) {
-        return gnmiSubscriptionManager.subscribe(request);
+    public void subscribe(SubscribeRequest request) {
+        subscribeManager.subscribe(request);
     }
 
     @Override
-    public void terminateSubscriptionChannel() {
-        gnmiSubscriptionManager.complete();
+    public void unsubscribe() {
+        subscribeManager.unsubscribe();
     }
 
     @Override
     public CompletableFuture<Boolean> probeService() {
-        return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        final StreamObserver<GetResponse> responseObserver = new StreamObserver<GetResponse>() {
+            @Override
+            public void onNext(GetResponse value) {
+                future.complete(true);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                // This gRPC call should throw INVALID_ARGUMENT status exception
+                // since "/onos-gnmi-ping" path does not exists in any config
+                // model For other status code such as UNIMPLEMENT, means the
+                // gNMI service is not available on the device.
+                future.complete(Status.fromThrowable(t).getCode()
+                                        == Status.Code.INVALID_ARGUMENT);
+            }
+
+            @Override
+            public void onCompleted() {
+                // Ignore. Unary call.
+            }
+        };
+        execRpc(s -> s.get(PING_REQUEST, responseObserver));
+        return future;
     }
 
     @Override
-    protected Void doShutdown() {
-        gnmiSubscriptionManager.shutdown();
-        return super.doShutdown();
+    public void shutdown() {
+        subscribeManager.shutdown();
+        super.shutdown();
     }
 
-    private CapabilityResponse doCapability() {
-        CapabilityRequest request = CapabilityRequest.newBuilder().build();
-        try {
-            return blockingStub.capabilities(request);
-        } catch (StatusRuntimeException e) {
-            log.warn("Unable to get capability from {}: {}", deviceId, e.getMessage());
-            return CapabilityResponse.getDefaultInstance();
+    /**
+     * Forces execution of an RPC in a cancellable context with a timeout.
+     *
+     * @param stubConsumer P4Runtime stub consumer
+     */
+    private void execRpc(Consumer<gNMIGrpc.gNMIStub> stubConsumer) {
+        if (log.isTraceEnabled()) {
+            log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
+                      RPC_TIMEOUT_SECONDS, context().getDeadline());
         }
+        runInCancellableContext(() -> stubConsumer.accept(
+                gNMIGrpc.newStub(channel)
+                        .withDeadlineAfter(RPC_TIMEOUT_SECONDS, TimeUnit.SECONDS)));
     }
 
-    private GetResponse doGet(GetRequest request) {
-        try {
-            return blockingStub.get(request);
-        } catch (StatusRuntimeException e) {
-            log.warn("Unable to get data from {}: {}", deviceId, e.getMessage());
-            return GetResponse.getDefaultInstance();
+    /**
+     * Forces execution of an RPC in a cancellable context with no timeout.
+     *
+     * @param stubConsumer P4Runtime stub consumer
+     */
+    void execRpcNoTimeout(Consumer<gNMIGrpc.gNMIStub> stubConsumer) {
+        if (log.isTraceEnabled()) {
+            log.trace("Executing RPC with no timeout (context deadline {})...",
+                      context().getDeadline());
         }
-    }
-
-    private SetResponse doSet(SetRequest request) {
-        try {
-            return blockingStub.set(request);
-        } catch (StatusRuntimeException e) {
-            log.warn("Unable to set data to {}: {}", deviceId, e.getMessage());
-            return SetResponse.getDefaultInstance();
-        }
-    }
-
-    private boolean doServiceAvailable() {
-        try {
-            return blockingStub.get(DUMMY_REQUEST) != null;
-        } catch (StatusRuntimeException e) {
-            // This gRPC call should throw INVALID_ARGUMENT status exception
-            // since "/onos-gnmi-test" path does not exists in any config model
-            // For other status code such as UNIMPLEMENT, means the gNMI
-            // service is not available on the device.
-            return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
-        }
+        runInCancellableContext(() -> stubConsumer.accept(
+                gNMIGrpc.newStub(channel)));
     }
 }
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
index 156cb43..73bb807 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
@@ -17,9 +17,8 @@
 package org.onosproject.gnmi.ctl;
 
 
+import com.google.common.util.concurrent.Futures;
 import gnmi.Gnmi;
-import gnmi.gNMIGrpc;
-import io.grpc.ManagedChannel;
 import io.grpc.StatusRuntimeException;
 import io.grpc.stub.ClientCallStreamObserver;
 import io.grpc.stub.StreamObserver;
@@ -29,9 +28,10 @@
 import org.slf4j.Logger;
 
 import java.net.ConnectException;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.lang.String.format;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -39,108 +39,110 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * A manager for the gNMI stream channel that opportunistically creates
- * new stream RCP stubs (e.g. when one fails because of errors) and posts
- * subscribe events via the gNMI controller.
+ * A manager for the gNMI Subscribe RPC that opportunistically starts new RPC
+ * (e.g. when one fails because of errors) and posts subscribe events via the
+ * gNMI controller.
  */
 final class GnmiSubscriptionManager {
 
-    /**
-     * The state of the subscription manager.
-     */
-    enum State {
-
-        /**
-         * Subscription not exists.
-         */
-        INIT,
-
-        /**
-         * Exists a subscription and channel opened.
-         */
-        SUBSCRIBED,
-
-        /**
-         * Exists a subscription, but the channel does not open.
-         */
-        RETRYING,
-    }
-
     // FIXME: make this configurable
     private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds
+
     private static final Logger log = getLogger(GnmiSubscriptionManager.class);
-    private final ManagedChannel channel;
+
+    private final GnmiClientImpl client;
     private final DeviceId deviceId;
     private final GnmiControllerImpl controller;
-
     private final StreamObserver<Gnmi.SubscribeResponse> responseObserver;
-    private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
+
+    private final ScheduledExecutorService streamCheckerExecutor =
+            newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-subscribe-check", "%d", log));
+    private Future<?> checkTask;
 
     private ClientCallStreamObserver<Gnmi.SubscribeRequest> requestObserver;
     private Gnmi.SubscribeRequest existingSubscription;
-    private final ScheduledExecutorService streamCheckerExecutor =
-            newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-probe", "%d", log));
+    private AtomicBoolean active = new AtomicBoolean(false);
 
-    GnmiSubscriptionManager(ManagedChannel channel, DeviceId deviceId,
+    GnmiSubscriptionManager(GnmiClientImpl client, DeviceId deviceId,
                             GnmiControllerImpl controller) {
-        this.channel = channel;
+        this.client = client;
         this.deviceId = deviceId;
         this.controller = controller;
         this.responseObserver = new InternalStreamResponseObserver();
-        streamCheckerExecutor.scheduleAtFixedRate(this::checkGnmiStream, 0,
-                                                  DEFAULT_RECONNECT_DELAY,
-                                                  TimeUnit.SECONDS);
+    }
+
+    void subscribe(Gnmi.SubscribeRequest request) {
+        synchronized (this) {
+            if (existingSubscription != null) {
+                if (existingSubscription.equals(request)) {
+                    // Nothing to do. We are already subscribed for the same
+                    // request.
+                    log.debug("Ignoring re-subscription to same request",
+                              deviceId);
+                    return;
+                }
+                log.debug("Cancelling existing subscription for {} before " +
+                                  "starting a new one", deviceId);
+                complete();
+            }
+            existingSubscription = request;
+            sendSubscribeRequest();
+            if (checkTask != null) {
+                checkTask = streamCheckerExecutor.scheduleAtFixedRate(
+                        this::checkSubscription, 0,
+                        DEFAULT_RECONNECT_DELAY,
+                        TimeUnit.SECONDS);
+            }
+        }
+    }
+
+    void unsubscribe() {
+        synchronized (this) {
+            if (checkTask != null) {
+                checkTask.cancel(false);
+                checkTask = null;
+            }
+            existingSubscription = null;
+            complete();
+        }
     }
 
     public void shutdown() {
-        log.info("gNMI subscription manager for device {} shutdown", deviceId);
-        streamCheckerExecutor.shutdown();
-        complete();
+        log.debug("Shutting down gNMI subscription manager for {}", deviceId);
+        unsubscribe();
+        streamCheckerExecutor.shutdownNow();
     }
 
-    private void initIfRequired() {
+    private void checkSubscription() {
+        synchronized (this) {
+            if (existingSubscription != null && !active.get()) {
+                if (client.isServerReachable() || Futures.getUnchecked(client.probeService())) {
+                    log.info("Re-starting Subscribe RPC for {}...", deviceId);
+                    sendSubscribeRequest();
+                } else {
+                    log.debug("Not restarting Subscribe RPC for {}, server is NOT reachable",
+                              deviceId);
+                }
+            }
+        }
+    }
+
+    private void sendSubscribeRequest() {
         if (requestObserver == null) {
-            log.debug("Creating new stream channel for {}...", deviceId);
-            requestObserver = (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
-                    gNMIGrpc.newStub(channel).subscribe(responseObserver);
-
+            log.debug("Starting new Subscribe RPC for {}...", deviceId);
+            client.execRpcNoTimeout(
+                    s -> requestObserver =
+                            (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
+                                    s.subscribe(responseObserver)
+            );
         }
-    }
-
-    boolean subscribe(Gnmi.SubscribeRequest request) {
-        synchronized (state) {
-            if (state.get() == State.SUBSCRIBED) {
-                // Cancel subscription when we need to subscribe new thing
-                complete();
-            }
-
-            existingSubscription = request;
-            return send(request);
-        }
-    }
-
-    private boolean send(Gnmi.SubscribeRequest value) {
-        initIfRequired();
-        try {
-            requestObserver.onNext(value);
-            state.set(State.SUBSCRIBED);
-            return true;
-        } catch (Throwable ex) {
-            if (ex instanceof StatusRuntimeException) {
-                log.warn("Unable to send subscribe request to {}: {}",
-                        deviceId, ex.getMessage());
-            } else {
-                log.warn("Exception while sending subscribe request to {}",
-                        deviceId, ex);
-            }
-            state.set(State.RETRYING);
-            return false;
-        }
+        requestObserver.onNext(existingSubscription);
+        active.set(true);
     }
 
     public void complete() {
-        synchronized (state) {
-            state.set(State.INIT);
+        synchronized (this) {
+            active.set(false);
             if (requestObserver != null) {
                 requestObserver.onCompleted();
                 requestObserver.cancel("Terminated", null);
@@ -149,21 +151,8 @@
         }
     }
 
-    private void checkGnmiStream() {
-        synchronized (state) {
-            if (state.get() != State.RETRYING) {
-                // No need to retry if the state is not RETRYING
-                return;
-            }
-            log.info("Try reconnecting gNMI stream to device {}", deviceId);
-
-            complete();
-            send(existingSubscription);
-        }
-    }
-
     /**
-     * Handles messages received from the device on the stream channel.
+     * Handles messages received from the device on the Subscribe RPC.
      */
     private final class InternalStreamResponseObserver
             implements StreamObserver<Gnmi.SubscribeResponse> {
@@ -171,41 +160,51 @@
         @Override
         public void onNext(Gnmi.SubscribeResponse message) {
             try {
-                log.debug("Received message on stream channel from {}: {}",
-                        deviceId, message.toString());
-                GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
-                GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
-                controller.postEvent(event);
+                if (log.isTraceEnabled()) {
+                    log.trace("Received SubscribeResponse from {}: {}",
+                              deviceId, message.toString());
+                }
+                controller.postEvent(new GnmiEvent(GnmiEvent.Type.UPDATE, new GnmiUpdate(
+                        deviceId, message.getUpdate(), message.getSyncResponse())));
             } catch (Throwable ex) {
-                log.error("Exception while processing stream message from {}",
-                        deviceId, ex);
+                log.error("Exception processing SubscribeResponse from " + deviceId,
+                          ex);
             }
         }
 
         @Override
         public void onError(Throwable throwable) {
+            complete();
             if (throwable instanceof StatusRuntimeException) {
                 StatusRuntimeException sre = (StatusRuntimeException) throwable;
                 if (sre.getStatus().getCause() instanceof ConnectException) {
-                    log.warn("Device {} is unreachable ({})",
-                            deviceId, sre.getCause().getMessage());
+                    log.warn("{} is unreachable ({})",
+                             deviceId, sre.getCause().getMessage());
                 } else {
-                    log.warn("Received error on stream channel for {}: {}",
-                            deviceId, throwable.getMessage());
+                    log.warn("Error on Subscribe RPC for {}: {}",
+                             deviceId, throwable.getMessage());
                 }
             } else {
-                log.warn(format("Received exception on stream channel for %s",
-                        deviceId), throwable);
+                log.error(format("Exception on Subscribe RPC for %s",
+                                 deviceId), throwable);
             }
-            state.set(State.RETRYING);
         }
 
         @Override
         public void onCompleted() {
-            log.warn("Stream channel for {} has completed", deviceId);
-            state.set(State.RETRYING);
+            complete();
+            log.warn("Subscribe RPC for {} has completed", deviceId);
         }
     }
+
+    @Override
+    protected void finalize() throws Throwable {
+        if (!streamCheckerExecutor.isShutdown()) {
+            log.error("Finalizing object but executor is still active! BUG? Shutting down...");
+            shutdown();
+        }
+        super.finalize();
+    }
 }
 
 
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
index d529529..7957006 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
@@ -28,11 +28,8 @@
 
     /**
      * Shutdowns the client by terminating any active RPC.
-     *
-     * @return a completable future to signal the completion of the shutdown
-     * procedure
      */
-    CompletableFuture<Void> shutdown();
+    void shutdown();
 
     /**
      * This method provides a coarse modelling of gRPC channel {@link
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index 117c6e3..befa334 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -26,18 +26,10 @@
 import org.onosproject.net.device.DeviceAgentEvent;
 import org.slf4j.Logger;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onlab.util.Tools.groupedThreads;
+import static java.lang.String.format;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -45,18 +37,11 @@
  */
 public abstract class AbstractGrpcClient implements GrpcClient {
 
-    // Timeout in seconds to obtain the request lock.
-    private static final int LOCK_TIMEOUT = 60;
-    private static final int DEFAULT_THREAD_POOL_SIZE = 10;
-
     protected final Logger log = getLogger(getClass());
 
-    private final Lock requestLock = new ReentrantLock();
     private final Context.CancellableContext cancellableContext =
             Context.current().withCancellation();
-    private final Executor contextExecutor;
 
-    protected final ExecutorService executorService;
     protected final DeviceId deviceId;
     protected final ManagedChannel channel;
     private final boolean persistent;
@@ -81,9 +66,6 @@
         this.channel = channel;
         this.persistent = persistent;
         this.controller = controller;
-        this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
-                "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
-        this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
 
         setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
     }
@@ -106,27 +88,15 @@
     }
 
     @Override
-    public CompletableFuture<Void> shutdown() {
+    public void shutdown() {
         if (cancellableContext.isCancelled()) {
             log.warn("Context is already cancelled, " +
                              "ignoring request to shutdown for {}...", deviceId);
-            return CompletableFuture.completedFuture(null);
+            return;
         }
-        return CompletableFuture.supplyAsync(this::doShutdown);
-    }
-
-    protected Void doShutdown() {
         log.warn("Shutting down client for {}...", deviceId);
         cancellableContext.cancel(new InterruptedException(
                 "Requested client shutdown"));
-        this.executorService.shutdownNow();
-        try {
-            executorService.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            log.warn("Executor service didn't shutdown in time.");
-            Thread.currentThread().interrupt();
-        }
-        return null;
     }
 
     /**
@@ -152,66 +122,22 @@
         return cancellableContext;
     }
 
-    /**
-     * Equivalent of supplyWithExecutor using the gRPC context executor of this
-     * client, such that if the context is cancelled (e.g. client shutdown) the
-     * RPC is automatically cancelled.
-     *
-     * @param <U>           return type of supplier
-     * @param supplier      the supplier to be executed
-     * @param opDescription the description of this supplier
-     * @return CompletableFuture includes the result of supplier
-     * @throws IllegalStateException if client has been shut down
-     */
-    protected <U> CompletableFuture<U> supplyInContext(
-            Supplier<U> supplier, String opDescription) {
-        return supplyWithExecutor(supplier, opDescription, contextExecutor);
-    }
-
-    /**
-     * Submits a task for async execution via the given executor. All tasks
-     * submitted with this method will be executed sequentially.
-     *
-     * @param <U>           return type of supplier
-     * @param supplier      the supplier to be executed
-     * @param opDescription the description of this supplier
-     * @param executor      the executor to execute this supplier
-     * @return CompletableFuture includes the result of supplier
-     * @throws IllegalStateException if client has been shut down
-     */
-    private <U> CompletableFuture<U> supplyWithExecutor(
-            Supplier<U> supplier, String opDescription, Executor executor) {
-        if (this.cancellableContext.isCancelled()) {
-            throw new IllegalStateException("Client has been shut down");
+    protected void handleRpcError(Throwable throwable, String opDescription) {
+        if (throwable instanceof StatusRuntimeException) {
+            final StatusRuntimeException sre = (StatusRuntimeException) throwable;
+            final String logMsg;
+            if (sre.getCause() == null) {
+                logMsg = sre.getMessage();
+            } else {
+                logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
+            }
+            log.warn("Error while performing {} on {}: {}",
+                     opDescription, deviceId, logMsg);
+            log.debug("", throwable);
+            return;
         }
-        return CompletableFuture.supplyAsync(() -> {
-            // TODO: explore a more relaxed locking strategy.
-            try {
-                if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
-                    log.error("LOCK TIMEOUT! This is likely a deadlock, "
-                                      + "please debug (executing {})",
-                              opDescription);
-                    throw new IllegalThreadStateException("Lock timeout");
-                }
-            } catch (InterruptedException e) {
-                log.warn("Thread interrupted while waiting for lock (executing {})",
-                         opDescription);
-                throw new IllegalStateException(e);
-            }
-            try {
-                return supplier.get();
-            } catch (StatusRuntimeException ex) {
-                log.warn("Unable to execute {} on {}: {}",
-                         opDescription, deviceId, ex.toString());
-                throw ex;
-            } catch (Throwable ex) {
-                log.error("Exception in client of {}, executing {}",
-                          deviceId, opDescription, ex);
-                throw ex;
-            } finally {
-                requestLock.unlock();
-            }
-        }, executor);
+        log.error(format("Exception while performing %s on %s",
+                         opDescription, deviceId), throwable);
     }
 
     private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
index d06e3fc..d54ef97 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
@@ -42,7 +42,6 @@
 import java.util.function.Consumer;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static java.lang.String.format;
 import static p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest.ResponseType.COOKIE_ONLY;
 
 /**
@@ -96,9 +95,9 @@
     }
 
     @Override
-    protected Void doShutdown() {
+    public void shutdown() {
         streamClient.closeSession();
-        return super.doShutdown();
+        super.shutdown();
     }
 
     @Override
@@ -191,6 +190,22 @@
         return future;
     }
 
+    @Override
+    protected void handleRpcError(Throwable throwable, String opDescription) {
+        if (throwable instanceof StatusRuntimeException) {
+            checkGrpcException((StatusRuntimeException) throwable);
+        }
+        super.handleRpcError(throwable, opDescription);
+    }
+
+    private void checkGrpcException(StatusRuntimeException sre) {
+        if (sre.getStatus().getCode() == Status.Code.PERMISSION_DENIED) {
+            // Notify upper layers that this node is not master.
+            controller.postEvent(new DeviceAgentEvent(
+                    DeviceAgentEvent.Type.NOT_MASTER, deviceId));
+        }
+    }
+
     /**
      * Returns the P4Runtime-internal device ID associated with this client.
      *
@@ -252,38 +267,4 @@
         runInCancellableContext(() -> stubConsumer.accept(
                 P4RuntimeGrpc.newStub(channel)));
     }
-
-    /**
-     * Logs the error and checks it for any condition that might be of interest
-     * for the controller.
-     *
-     * @param throwable     throwable
-     * @param opDescription operation description for logging
-     */
-    void handleRpcError(Throwable throwable, String opDescription) {
-        if (throwable instanceof StatusRuntimeException) {
-            final StatusRuntimeException sre = (StatusRuntimeException) throwable;
-            checkGrpcException(sre);
-            final String logMsg;
-            if (sre.getCause() == null) {
-                logMsg = sre.getMessage();
-            } else {
-                logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
-            }
-            log.warn("Error while performing {} on {}: {}",
-                     opDescription, deviceId, logMsg);
-            log.debug("", throwable);
-            return;
-        }
-        log.error(format("Exception while performing %s on %s",
-                         opDescription, deviceId), throwable);
-    }
-
-    private void checkGrpcException(StatusRuntimeException sre) {
-        if (sre.getStatus().getCode() == Status.Code.PERMISSION_DENIED) {
-            // Notify upper layers that this node is not master.
-            controller.postEvent(new DeviceAgentEvent(
-                    DeviceAgentEvent.Type.NOT_MASTER, deviceId));
-        }
-    }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
index 1bf195c..df4844b 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -303,25 +303,10 @@
         void send(StreamMessageRequest value) {
             synchronized (this) {
                 initIfRequired();
-                doSend(value);
-            }
-        }
-
-        private void doSend(StreamMessageRequest value) {
-            try {
                 requestObserver.onNext(value);
                 // Optimistically set the session as open. In case of errors, it
                 // will be closed by the response stream observer.
                 streamChannelManager.signalOpen();
-            } catch (Throwable ex) {
-                if (ex instanceof StatusRuntimeException) {
-                    log.warn("Unable to send {} to {}: {}",
-                             value.getUpdateCase().toString(), deviceId, ex.getMessage());
-                } else {
-                    log.error("Exception while sending {} to {}: {}",
-                              value.getUpdateCase().toString(), deviceId, ex);
-                }
-                teardown();
             }
         }