New P4RuntimeClient implementation that supports batching and error reporting

The new client API supports batching and provides detailed response for
write requests (e.g. if entity already exists when inserting), which was
not possible with the old one.

This patch includes:
- New more efficient implementation of P4RuntimeClient (no more locking,
use native gRPC executor, use stub deadlines)
- Ported all codecs to new AbstractCodec-based implementation (needed to
implement codec cache in the future)
- Uses batching in P4RuntimeFlowRuleProgrammable and
P4RuntimeGroupActionProgrammable
- Minor changes to PI framework runtime classes

Change-Id: I3fac42057bb4e1389d761006a32600c786598683
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
new file mode 100644
index 0000000..353d44e
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
+import org.onosproject.grpc.ctl.AbstractGrpcClient;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.runtime.PiPacketOperation;
+import org.onosproject.net.pi.service.PiPipeconfService;
+import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
+import org.onosproject.p4runtime.api.P4RuntimeEvent;
+import org.onosproject.p4runtime.ctl.controller.BaseEventSubject;
+import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
+import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
+import p4.v1.P4RuntimeGrpc;
+import p4.v1.P4RuntimeOuterClass;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+
+/**
+ * Implementation of P4RuntimeClient.
+ */
+public final class P4RuntimeClientImpl
+        extends AbstractGrpcClient implements P4RuntimeClient {
+
+    // TODO: consider making timeouts configurable per-device via netcfg
+    /**
+     * Timeout in seconds for short/fast RPCs.
+     */
+    static final int SHORT_TIMEOUT_SECONDS = 10;
+    /**
+     * Timeout in seconds for RPCs that involve transfer of potentially large
+     * amount of data. This shoulld be long enough to allow for network delay
+     * (e.g. to transfer large pipeline binaries over slow network).
+     */
+    static final int LONG_TIMEOUT_SECONDS = 60;
+
+    private final long p4DeviceId;
+    private final ManagedChannel channel;
+    private final P4RuntimeControllerImpl controller;
+    private final StreamClientImpl streamClient;
+    private final PipelineConfigClientImpl pipelineConfigClient;
+
+    /**
+     * Instantiates a new client with the given arguments.
+     *
+     * @param clientKey       client key
+     * @param channel         gRPC managed channel
+     * @param controller      P$Runtime controller instance
+     * @param pipeconfService pipeconf service instance
+     */
+    public P4RuntimeClientImpl(P4RuntimeClientKey clientKey,
+                               ManagedChannel channel,
+                               P4RuntimeControllerImpl controller,
+                               PiPipeconfService pipeconfService) {
+        super(clientKey);
+        checkNotNull(channel);
+        checkNotNull(controller);
+        checkNotNull(pipeconfService);
+
+        this.p4DeviceId = clientKey.p4DeviceId();
+        this.channel = channel;
+        this.controller = controller;
+        this.streamClient = new StreamClientImpl(
+                pipeconfService, this, controller);
+        this.pipelineConfigClient = new PipelineConfigClientImpl(this);
+    }
+
+    @Override
+    protected Void doShutdown() {
+        streamClient.closeSession();
+        return super.doShutdown();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> setPipelineConfig(
+            PiPipeconf pipeconf, ByteBuffer deviceData) {
+        return pipelineConfigClient.setPipelineConfig(pipeconf, deviceData);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isPipelineConfigSet(
+            PiPipeconf pipeconf, ByteBuffer deviceData) {
+        return pipelineConfigClient.isPipelineConfigSet(pipeconf, deviceData);
+    }
+
+    @Override
+    public ReadRequest read(PiPipeconf pipeconf) {
+        return new ReadRequestImpl(this, pipeconf);
+    }
+
+    @Override
+    public void openSession() {
+        streamClient.openSession();
+    }
+
+    @Override
+    public boolean isSessionOpen() {
+        return streamClient.isSessionOpen();
+    }
+
+    @Override
+    public void closeSession() {
+        streamClient.closeSession();
+    }
+
+    @Override
+    public void runForMastership() {
+        streamClient.runForMastership();
+    }
+
+    @Override
+    public boolean isMaster() {
+        return streamClient.isMaster();
+    }
+
+    @Override
+    public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
+        streamClient.packetOut(packet, pipeconf);
+    }
+
+    @Override
+    public WriteRequest write(PiPipeconf pipeconf) {
+        return new WriteRequestImpl(this, pipeconf);
+    }
+
+    /**
+     * Returns the P4Runtime-internal device ID associated with this client.
+     *
+     * @return P4Runtime-internal device ID
+     */
+    long p4DeviceId() {
+        return this.p4DeviceId;
+    }
+
+    /**
+     * Returns the ONOS device ID associated with this client.
+     *
+     * @return ONOS device ID
+     */
+    DeviceId deviceId() {
+        return this.deviceId;
+    }
+
+    /**
+     * Returns the election ID last used in a MasterArbitrationUpdate message
+     * sent by the client to the server. No guarantees are given that this is
+     * the current election ID associated to the session, nor that the server
+     * has acknowledged this value as valid.
+     *
+     * @return election ID uint128 protobuf message
+     */
+    P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
+        return streamClient.lastUsedElectionId();
+    }
+
+    /**
+     * Forces execution of an RPC in a cancellable context with the given
+     * timeout (in seconds).
+     *
+     * @param stubConsumer P4Runtime stub consumer
+     * @param timeout      timeout in seconds
+     */
+    void execRpc(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer, int timeout) {
+        if (log.isTraceEnabled()) {
+            log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
+                      timeout, context().getDeadline());
+        }
+        runInCancellableContext(() -> stubConsumer.accept(
+                P4RuntimeGrpc.newStub(channel)
+                        .withDeadlineAfter(timeout, TimeUnit.SECONDS)));
+    }
+
+    /**
+     * Forces execution of an RPC in a cancellable context with no timeout.
+     *
+     * @param stubConsumer P4Runtime stub consumer
+     */
+    void execRpcNoTimeout(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer) {
+        if (log.isTraceEnabled()) {
+            log.trace("Executing RPC with no timeout (context deadline {})...",
+                      context().getDeadline());
+        }
+        runInCancellableContext(() -> stubConsumer.accept(
+                P4RuntimeGrpc.newStub(channel)));
+    }
+
+    /**
+     * Logs the error and checks it for any condition that might be of interest
+     * for the controller.
+     *
+     * @param throwable     throwable
+     * @param opDescription operation description for logging
+     */
+    void handleRpcError(Throwable throwable, String opDescription) {
+        if (throwable instanceof StatusRuntimeException) {
+            final StatusRuntimeException sre = (StatusRuntimeException) throwable;
+            checkGrpcException(sre);
+            final String logMsg;
+            if (sre.getCause() == null) {
+                logMsg = sre.getMessage();
+            } else {
+                logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
+            }
+            log.warn("Error while performing {} on {}: {}",
+                     opDescription, deviceId, logMsg);
+            log.debug("", throwable);
+            return;
+        }
+        log.error(format("Exception while performing %s on %s",
+                         opDescription, deviceId), throwable);
+    }
+
+    private void checkGrpcException(StatusRuntimeException sre) {
+        switch (sre.getStatus().getCode()) {
+            case PERMISSION_DENIED:
+                // Notify upper layers that this node is not master.
+                controller.postEvent(new P4RuntimeEvent(
+                        P4RuntimeEvent.Type.PERMISSION_DENIED,
+                        new BaseEventSubject(deviceId)));
+                break;
+            case UNAVAILABLE:
+                // Channel might be closed.
+                controller.postEvent(new P4RuntimeEvent(
+                        P4RuntimeEvent.Type.CHANNEL_EVENT,
+                        new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
+                break;
+            default:
+                break;
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
new file mode 100644
index 0000000..c023f26
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.client;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.TextFormat;
+import io.grpc.stub.StreamObserver;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.p4runtime.api.P4RuntimePipelineConfigClient;
+import org.onosproject.p4runtime.ctl.utils.PipeconfHelper;
+import org.slf4j.Logger;
+import p4.config.v1.P4InfoOuterClass;
+import p4.tmp.P4Config;
+import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
+import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
+import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigResponse;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.LONG_TIMEOUT_SECONDS;
+import static org.slf4j.LoggerFactory.getLogger;
+import static p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest.ResponseType.COOKIE_ONLY;
+import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
+
+/**
+ * Implementation of P4RuntimePipelineConfigClient. Handles pipeline
+ * config-related RPCs.
+ */
+final class PipelineConfigClientImpl implements P4RuntimePipelineConfigClient {
+
+    private static final Logger log = getLogger(PipelineConfigClientImpl.class);
+
+    private static final SetForwardingPipelineConfigResponse DEFAULT_SET_RESPONSE =
+            SetForwardingPipelineConfigResponse.getDefaultInstance();
+
+    private final P4RuntimeClientImpl client;
+
+    PipelineConfigClientImpl(P4RuntimeClientImpl client) {
+        this.client = client;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> setPipelineConfig(
+            PiPipeconf pipeconf, ByteBuffer deviceData) {
+
+        log.info("Setting pipeline config for {} to {}...",
+                 client.deviceId(), pipeconf.id());
+
+        checkNotNull(deviceData, "deviceData cannot be null");
+
+        final ForwardingPipelineConfig pipelineConfigMsg =
+                buildForwardingPipelineConfigMsg(pipeconf, deviceData);
+        if (pipelineConfigMsg == null) {
+            // Error logged in buildForwardingPipelineConfigMsg()
+            return completedFuture(false);
+        }
+
+        final SetForwardingPipelineConfigRequest requestMsg =
+                SetForwardingPipelineConfigRequest
+                        .newBuilder()
+                        .setDeviceId(client.p4DeviceId())
+                        .setElectionId(client.lastUsedElectionId())
+                        .setAction(VERIFY_AND_COMMIT)
+                        .setConfig(pipelineConfigMsg)
+                        .build();
+
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        final StreamObserver<SetForwardingPipelineConfigResponse> responseObserver =
+                new StreamObserver<SetForwardingPipelineConfigResponse>() {
+                    @Override
+                    public void onNext(SetForwardingPipelineConfigResponse value) {
+                        if (!DEFAULT_SET_RESPONSE.equals(value)) {
+                            log.warn("Received invalid SetForwardingPipelineConfigResponse " +
+                                             " from {} [{}]",
+                                     client.deviceId(),
+                                     TextFormat.shortDebugString(value));
+                            future.complete(false);
+                        }
+                        // All good, pipeline is set.
+                        future.complete(true);
+                    }
+                    @Override
+                    public void onError(Throwable t) {
+                        client.handleRpcError(t, "SET-pipeline-config");
+                        future.complete(false);
+                    }
+                    @Override
+                    public void onCompleted() {
+                        // Ignore, unary call.
+                    }
+                };
+
+        client.execRpc(
+                s -> s.setForwardingPipelineConfig(requestMsg, responseObserver),
+                LONG_TIMEOUT_SECONDS);
+
+        return future;
+    }
+
+    private ForwardingPipelineConfig buildForwardingPipelineConfigMsg(
+            PiPipeconf pipeconf, ByteBuffer deviceData) {
+
+        final P4InfoOuterClass.P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
+        if (p4Info == null) {
+            // Problem logged by PipeconfHelper.
+            return null;
+        }
+        final ForwardingPipelineConfig.Cookie cookieMsg =
+                ForwardingPipelineConfig.Cookie
+                        .newBuilder()
+                        .setCookie(pipeconf.fingerprint())
+                        .build();
+        // FIXME: This is specific to PI P4Runtime implementation and should be
+        //  moved to driver.
+        final P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
+                .newBuilder()
+                .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
+                .setReassign(true)
+                .setDeviceData(ByteString.copyFrom(deviceData))
+                .build();
+        return ForwardingPipelineConfig
+                .newBuilder()
+                .setP4Info(p4Info)
+                .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
+                .setCookie(cookieMsg)
+                .build();
+    }
+
+
+    @Override
+    public CompletableFuture<Boolean> isPipelineConfigSet(
+            PiPipeconf pipeconf, ByteBuffer expectedDeviceData) {
+        return getPipelineCookieFromServer()
+                .thenApply(cfgFromDevice -> comparePipelineConfig(
+                        pipeconf, expectedDeviceData, cfgFromDevice));
+    }
+
+    private boolean comparePipelineConfig(
+            PiPipeconf pipeconf, ByteBuffer expectedDeviceData,
+            ForwardingPipelineConfig cfgFromDevice) {
+        if (cfgFromDevice == null) {
+            return false;
+        }
+
+        final ForwardingPipelineConfig expectedCfg = buildForwardingPipelineConfigMsg(
+                pipeconf, expectedDeviceData);
+        if (expectedCfg == null) {
+            return false;
+        }
+
+        if (cfgFromDevice.hasCookie()) {
+            return cfgFromDevice.getCookie().getCookie() == pipeconf.fingerprint();
+        }
+
+        // No cookie.
+        log.warn("{} returned GetForwardingPipelineConfigResponse " +
+                         "with 'cookie' field unset. " +
+                         "Will try by comparing 'device_data' and 'p4_info'...",
+                 client.deviceId());
+
+        if (cfgFromDevice.getP4DeviceConfig().isEmpty()
+                && !expectedCfg.getP4DeviceConfig().isEmpty()) {
+            // Don't bother with a warn or error since we don't really allow
+            // updating the P4 blob to a different one without changing the
+            // P4Info. I.e, comparing just the P4Info should be enough for us.
+            log.debug("{} returned GetForwardingPipelineConfigResponse " +
+                              "with empty 'p4_device_config' field, " +
+                              "equality will be based only on P4Info",
+                      client.deviceId());
+            return cfgFromDevice.getP4Info().equals(expectedCfg.getP4Info());
+        }
+
+        return cfgFromDevice.getP4DeviceConfig()
+                .equals(expectedCfg.getP4DeviceConfig())
+                && cfgFromDevice.getP4Info()
+                .equals(expectedCfg.getP4Info());
+    }
+
+    private CompletableFuture<ForwardingPipelineConfig> getPipelineCookieFromServer() {
+        final GetForwardingPipelineConfigRequest request =
+                GetForwardingPipelineConfigRequest
+                        .newBuilder()
+                        .setDeviceId(client.p4DeviceId())
+                        .setResponseType(COOKIE_ONLY)
+                        .build();
+        final CompletableFuture<ForwardingPipelineConfig> future = new CompletableFuture<>();
+        final StreamObserver<GetForwardingPipelineConfigResponse> responseObserver =
+                new StreamObserver<GetForwardingPipelineConfigResponse>() {
+                    @Override
+                    public void onNext(GetForwardingPipelineConfigResponse value) {
+                        if (value.hasConfig()) {
+                            future.complete(value.getConfig());
+                        } else {
+                            log.warn("{} returned {} with 'config' field unset",
+                                     client.deviceId(), value.getClass().getSimpleName());
+                        }
+                        future.complete(null);
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        client.handleRpcError(t, "GET-pipeline-config");
+                        future.complete(null);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                        // Ignore, unary call.
+                    }
+                };
+        // Use long timeout as the device might return the full P4 blob
+        // (e.g. server does not support cookie), over a slow network.
+        client.execRpc(
+                s -> s.getForwardingPipelineConfig(request, responseObserver),
+                LONG_TIMEOUT_SECONDS);
+        return future;
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java
new file mode 100644
index 0000000..c85c7e8
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java
@@ -0,0 +1,386 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.client;
+
+import com.google.common.util.concurrent.Futures;
+import io.grpc.stub.StreamObserver;
+import org.onosproject.net.pi.model.PiActionProfileId;
+import org.onosproject.net.pi.model.PiCounterId;
+import org.onosproject.net.pi.model.PiMeterId;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiTableId;
+import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.p4runtime.api.P4RuntimeReadClient;
+import org.onosproject.p4runtime.ctl.codec.CodecException;
+import org.onosproject.p4runtime.ctl.utils.P4InfoBrowser;
+import org.onosproject.p4runtime.ctl.utils.PipeconfHelper;
+import org.slf4j.Logger;
+import p4.v1.P4RuntimeOuterClass;
+
+import java.util.concurrent.CompletableFuture;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.SHORT_TIMEOUT_SECONDS;
+import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles the creation of P4Runtime ReadRequest and execution of the Read RPC
+ * on the server.
+ */
+public final class ReadRequestImpl implements P4RuntimeReadClient.ReadRequest {
+
+    private static final Logger log = getLogger(ReadRequestImpl.class);
+
+    private final P4RuntimeClientImpl client;
+    private final PiPipeconf pipeconf;
+    private final P4RuntimeOuterClass.ReadRequest.Builder requestMsg;
+
+    ReadRequestImpl(P4RuntimeClientImpl client, PiPipeconf pipeconf) {
+        this.client = client;
+        this.pipeconf = pipeconf;
+        this.requestMsg = P4RuntimeOuterClass.ReadRequest.newBuilder()
+                .setDeviceId(client.p4DeviceId());
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest handles(Iterable<? extends PiHandle> handles) {
+        checkNotNull(handles);
+        handles.forEach(this::handle);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest tableEntries(Iterable<PiTableId> tableIds) {
+        checkNotNull(tableIds);
+        tableIds.forEach(this::tableEntries);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest defaultTableEntry(Iterable<PiTableId> tableIds) {
+        checkNotNull(tableIds);
+        tableIds.forEach(this::defaultTableEntry);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest actionProfileGroups(Iterable<PiActionProfileId> actionProfileIds) {
+        checkNotNull(actionProfileIds);
+        actionProfileIds.forEach(this::actionProfileGroups);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest actionProfileMembers(Iterable<PiActionProfileId> actionProfileIds) {
+        checkNotNull(actionProfileIds);
+        actionProfileIds.forEach(this::actionProfileMembers);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest counterCells(Iterable<PiCounterId> counterIds) {
+        checkNotNull(counterIds);
+        counterIds.forEach(this::counterCells);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest directCounterCells(Iterable<PiTableId> tableIds) {
+        checkNotNull(tableIds);
+        tableIds.forEach(this::directCounterCells);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest meterCells(Iterable<PiMeterId> meterIds) {
+        checkNotNull(meterIds);
+        meterIds.forEach(this::meterCells);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest directMeterCells(Iterable<PiTableId> tableIds) {
+        checkNotNull(tableIds);
+        tableIds.forEach(this::directMeterCells);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest handle(PiHandle handle) {
+        checkNotNull(handle);
+        try {
+            requestMsg.addEntities(CODECS.handle().encode(handle, null, pipeconf));
+        } catch (CodecException e) {
+            log.warn("Unable to read {} from {}: {} [{}]",
+                     handle.entityType(), client.deviceId(), e.getMessage(), handle);
+        }
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest tableEntries(PiTableId tableId) {
+        try {
+            doTableEntry(tableId, false);
+        } catch (InternalRequestException e) {
+            log.warn("Unable to read entries for table '{}' from {}: {}",
+                     tableId, client.deviceId(), e.getMessage());
+        }
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest defaultTableEntry(PiTableId tableId) {
+        try {
+            doTableEntry(tableId, true);
+        } catch (InternalRequestException e) {
+            log.warn("Unable to read default entry for table '{}' from {}: {}",
+                     tableId, client.deviceId(), e.getMessage());
+        }
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest actionProfileGroups(PiActionProfileId actionProfileId) {
+        try {
+            requestMsg.addEntities(
+                    P4RuntimeOuterClass.Entity.newBuilder()
+                            .setActionProfileGroup(
+                                    P4RuntimeOuterClass.ActionProfileGroup.newBuilder()
+                                            .setActionProfileId(
+                                                    p4ActionProfileId(actionProfileId))
+                                            .build())
+                            .build());
+        } catch (InternalRequestException e) {
+            log.warn("Unable to read groups for action profile '{}' from {}: {}",
+                     actionProfileId, client.deviceId(), e.getMessage());
+        }
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest actionProfileMembers(PiActionProfileId actionProfileId) {
+        try {
+            requestMsg.addEntities(
+                    P4RuntimeOuterClass.Entity.newBuilder()
+                            .setActionProfileMember(
+                                    P4RuntimeOuterClass.ActionProfileMember.newBuilder()
+                                            .setActionProfileId(
+                                                    p4ActionProfileId(actionProfileId))
+                                            .build())
+                            .build());
+        } catch (InternalRequestException e) {
+            log.warn("Unable to read members for action profile '{}' from {}: {}",
+                     actionProfileId, client.deviceId(), e.getMessage());
+        }
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest counterCells(PiCounterId counterId) {
+        try {
+            requestMsg.addEntities(
+                    P4RuntimeOuterClass.Entity.newBuilder()
+                            .setCounterEntry(
+                                    P4RuntimeOuterClass.CounterEntry.newBuilder()
+                                            .setCounterId(p4CounterId(counterId))
+                                            .build())
+                            .build());
+        } catch (InternalRequestException e) {
+            log.warn("Unable to read cells for counter '{}' from {}: {}",
+                     counterId, client.deviceId(), e.getMessage());
+        }
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest meterCells(PiMeterId meterId) {
+        try {
+            requestMsg.addEntities(
+                    P4RuntimeOuterClass.Entity.newBuilder()
+                            .setMeterEntry(
+                                    P4RuntimeOuterClass.MeterEntry.newBuilder()
+                                            .setMeterId(p4MeterId(meterId))
+                                            .build())
+                            .build());
+        } catch (InternalRequestException e) {
+            log.warn("Unable to read cells for meter '{}' from {}: {}",
+                     meterId, client.deviceId(), e.getMessage());
+        }
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest directCounterCells(PiTableId tableId) {
+        try {
+            requestMsg.addEntities(
+                    P4RuntimeOuterClass.Entity.newBuilder()
+                            .setDirectCounterEntry(
+                                    P4RuntimeOuterClass.DirectCounterEntry.newBuilder()
+                                            .setTableEntry(
+                                                    P4RuntimeOuterClass.TableEntry
+                                                            .newBuilder()
+                                                            .setTableId(p4TableId(tableId))
+                                                            .build())
+                                            .build())
+                            .build());
+        } catch (InternalRequestException e) {
+            log.warn("Unable to read direct counter cells for table '{}' from {}: {}",
+                     tableId, client.deviceId(), e.getMessage());
+        }
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest directMeterCells(PiTableId tableId) {
+        try {
+            requestMsg.addEntities(
+                    P4RuntimeOuterClass.Entity.newBuilder()
+                            .setDirectMeterEntry(
+                                    P4RuntimeOuterClass.DirectMeterEntry.newBuilder()
+                                            .setTableEntry(
+                                                    P4RuntimeOuterClass.TableEntry
+                                                            .newBuilder()
+                                                            .setTableId(p4TableId(tableId))
+                                                            .build())
+                                            .build())
+                            .build());
+        } catch (InternalRequestException e) {
+            log.warn("Unable to read direct meter cells for table '{}' from {}: {}",
+                     tableId, client.deviceId(), e.getMessage());
+        }
+        return this;
+    }
+
+    private void doTableEntry(PiTableId piTableId, boolean defaultEntries)
+            throws InternalRequestException {
+        checkNotNull(piTableId);
+        final P4RuntimeOuterClass.Entity entityMsg = P4RuntimeOuterClass.Entity
+                .newBuilder()
+                .setTableEntry(
+                        P4RuntimeOuterClass.TableEntry.newBuilder()
+                                .setTableId(p4TableId(piTableId))
+                                .setIsDefaultAction(defaultEntries)
+                                .setCounterData(P4RuntimeOuterClass.CounterData
+                                                        .getDefaultInstance())
+                                .build())
+                .build();
+        requestMsg.addEntities(entityMsg);
+    }
+
+    @Override
+    public CompletableFuture<P4RuntimeReadClient.ReadResponse> submit() {
+        final P4RuntimeOuterClass.ReadRequest readRequest = requestMsg.build();
+        log.debug("Sending read request to {} for {} entities...",
+                  client.deviceId(), readRequest.getEntitiesCount());
+        if (readRequest.getEntitiesCount() == 0) {
+            // No need to ask the server.
+            return completedFuture(ReadResponseImpl.EMPTY);
+        }
+        final CompletableFuture<P4RuntimeReadClient.ReadResponse> future =
+                new CompletableFuture<>();
+        // Instantiate response builder and let stream observer populate it.
+        final ReadResponseImpl.Builder responseBuilder =
+                ReadResponseImpl.builder(client.deviceId(), pipeconf);
+        final StreamObserver<P4RuntimeOuterClass.ReadResponse> observer =
+                new StreamObserver<P4RuntimeOuterClass.ReadResponse>() {
+                    @Override
+                    public void onNext(P4RuntimeOuterClass.ReadResponse value) {
+                        log.debug("Received read response from {} with {} entities...",
+                                  client.deviceId(), value.getEntitiesCount());
+                        value.getEntitiesList().forEach(responseBuilder::addEntity);
+                    }
+                    @Override
+                    public void onError(Throwable t) {
+                        client.handleRpcError(t, "READ");
+                        // TODO: implement parsing of trailer errors
+                        future.complete(responseBuilder.fail(t));
+                    }
+                    @Override
+                    public void onCompleted() {
+                        future.complete(responseBuilder.build());
+                    }
+                };
+        client.execRpc(s -> s.read(readRequest, observer), SHORT_TIMEOUT_SECONDS);
+        return future;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadResponse submitSync() {
+        return Futures.getUnchecked(submit());
+    }
+
+    private int p4TableId(PiTableId piTableId) throws InternalRequestException {
+        try {
+            return getBrowser().tables().getByName(piTableId.id())
+                    .getPreamble().getId();
+        } catch (P4InfoBrowser.NotFoundException e) {
+            throw new InternalRequestException(e.getMessage());
+        }
+    }
+
+    private int p4ActionProfileId(PiActionProfileId piActionProfileId)
+            throws InternalRequestException {
+        try {
+            return getBrowser().actionProfiles().getByName(piActionProfileId.id())
+                    .getPreamble().getId();
+        } catch (P4InfoBrowser.NotFoundException e) {
+            throw new InternalRequestException(e.getMessage());
+        }
+    }
+
+    private int p4CounterId(PiCounterId counterId)
+            throws InternalRequestException {
+        try {
+            return getBrowser().counters().getByName(counterId.id())
+                    .getPreamble().getId();
+        } catch (P4InfoBrowser.NotFoundException e) {
+            throw new InternalRequestException(e.getMessage());
+        }
+    }
+
+    private int p4MeterId(PiMeterId meterId)
+            throws InternalRequestException {
+        try {
+            return getBrowser().meters().getByName(meterId.id())
+                    .getPreamble().getId();
+        } catch (P4InfoBrowser.NotFoundException e) {
+            throw new InternalRequestException(e.getMessage());
+        }
+    }
+
+    private P4InfoBrowser getBrowser() throws InternalRequestException {
+        final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+        if (browser == null) {
+            throw new InternalRequestException(
+                    "Unable to get a P4Info browser for pipeconf " + pipeconf.id());
+        }
+        return browser;
+    }
+
+    /**
+     * Internal exception to signal that something went wrong when populating
+     * the request.
+     */
+    private final class InternalRequestException extends Exception {
+
+        private InternalRequestException(String message) {
+            super(message);
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadResponseImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadResponseImpl.java
new file mode 100644
index 0000000..5e57797
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadResponseImpl.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.client;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.protobuf.TextFormat;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.runtime.PiEntity;
+import org.onosproject.p4runtime.api.P4RuntimeReadClient;
+import org.onosproject.p4runtime.ctl.codec.CodecException;
+import org.slf4j.Logger;
+import p4.v1.P4RuntimeOuterClass;
+
+import java.util.Collection;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles creation of ReadResponse by parsing Read RPC server responses.
+ */
+public final class ReadResponseImpl implements P4RuntimeReadClient.ReadResponse {
+
+    private static final Logger log = getLogger(ReadResponseImpl.class);
+
+    public static final ReadResponseImpl EMPTY = new ReadResponseImpl(
+            true, ImmutableList.of(), ImmutableListMultimap.of(), null, null);
+
+    private final boolean success;
+    private final ImmutableList<PiEntity> entities;
+    private final ImmutableListMultimap<Class<? extends PiEntity>, PiEntity> typeToEntities;
+    private final String explanation;
+    private final Throwable throwable;
+
+    private ReadResponseImpl(
+            boolean success, ImmutableList<PiEntity> entities,
+            ImmutableListMultimap<Class<? extends PiEntity>, PiEntity> typeToEntities,
+            String explanation, Throwable throwable) {
+        this.success = success;
+        this.entities = entities;
+        this.typeToEntities = typeToEntities;
+        this.explanation = explanation;
+        this.throwable = throwable;
+    }
+
+    @Override
+    public boolean isSuccess() {
+        return success;
+    }
+
+    @Override
+    public Collection<PiEntity> all() {
+        return entities;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <E extends PiEntity> Collection<E> all(Class<E> clazz) {
+        return (ImmutableList<E>) typeToEntities.get(clazz);
+    }
+
+    @Override
+    public String explanation() {
+        return explanation;
+    }
+
+    @Override
+    public Throwable throwable() {
+        return throwable;
+    }
+
+    static Builder builder(DeviceId deviceId, PiPipeconf pipeconf) {
+        return new Builder(deviceId, pipeconf);
+    }
+
+    /**
+     * Builder of P4RuntimeReadResponseImpl.
+     */
+    static final class Builder {
+
+        private final DeviceId deviceId;
+        private final PiPipeconf pipeconf;
+        private final List<PiEntity> entities = Lists.newArrayList();
+        private final ListMultimap<Class<? extends PiEntity>, PiEntity>
+                typeToEntities = ArrayListMultimap.create();
+
+        private boolean success = true;
+        private String explanation;
+        private Throwable throwable;
+
+        private Builder(DeviceId deviceId, PiPipeconf pipeconf) {
+            this.deviceId = deviceId;
+            this.pipeconf = pipeconf;
+        }
+
+        void addEntity(P4RuntimeOuterClass.Entity entityMsg) {
+            try {
+                final PiEntity piEntity = CODECS.entity().decode(entityMsg, null, pipeconf);
+                entities.add(piEntity);
+                typeToEntities.put(piEntity.getClass(), piEntity);
+            } catch (CodecException e) {
+                log.warn("Unable to decode {} message from {}: {} [{}]",
+                         entityMsg.getEntityCase().name(), deviceId,
+                          e.getMessage(), TextFormat.shortDebugString(entityMsg));
+            }
+        }
+
+        ReadResponseImpl fail(Throwable throwable) {
+            checkNotNull(throwable);
+            this.success = false;
+            this.explanation = throwable.getMessage();
+            this.throwable = throwable;
+            return build();
+        }
+
+        ReadResponseImpl build() {
+            if (success && entities.isEmpty()) {
+                return EMPTY;
+            }
+            return new ReadResponseImpl(
+                    success, ImmutableList.copyOf(entities),
+                    ImmutableListMultimap.copyOf(typeToEntities),
+                    explanation, throwable);
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
new file mode 100644
index 0000000..7afd97b
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -0,0 +1,404 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.client;
+
+import com.google.protobuf.TextFormat;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.runtime.PiPacketOperation;
+import org.onosproject.net.pi.service.PiPipeconfService;
+import org.onosproject.p4runtime.api.P4RuntimeEvent;
+import org.onosproject.p4runtime.api.P4RuntimeStreamClient;
+import org.onosproject.p4runtime.ctl.codec.CodecException;
+import org.onosproject.p4runtime.ctl.controller.ArbitrationUpdateEvent;
+import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
+import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
+import org.onosproject.p4runtime.ctl.controller.PacketInEvent;
+import org.slf4j.Logger;
+import p4.v1.P4RuntimeOuterClass;
+import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
+import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
+
+import java.math.BigInteger;
+import java.net.ConnectException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.lang.String.format;
+import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of P4RuntimeStreamClient. Handles P4Runtime StreamChannel RPC
+ * operations, such as arbitration update and packet-in/out.
+ */
+public final class StreamClientImpl implements P4RuntimeStreamClient {
+
+    private static final Logger log = getLogger(StreamClientImpl.class);
+
+    private static final BigInteger ONE_THOUSAND = BigInteger.valueOf(1000);
+
+    private final P4RuntimeClientImpl client;
+    private final DeviceId deviceId;
+    private final long p4DeviceId;
+    private final PiPipeconfService pipeconfService;
+    private final P4RuntimeControllerImpl controller;
+    private final StreamChannelManager streamChannelManager = new StreamChannelManager();
+
+    private P4RuntimeOuterClass.Uint128 lastUsedElectionId = P4RuntimeOuterClass.Uint128
+            .newBuilder().setLow(1).build();
+
+    private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
+
+    StreamClientImpl(
+            PiPipeconfService pipeconfService,
+            P4RuntimeClientImpl client,
+            P4RuntimeControllerImpl controller) {
+        this.client = client;
+        this.deviceId = client.deviceId();
+        this.p4DeviceId = client.p4DeviceId();
+        this.pipeconfService = pipeconfService;
+        this.controller = controller;
+    }
+
+    @Override
+    public void openSession() {
+        if (isSessionOpen()) {
+            log.debug("Dropping request to open session for {}, session is already open",
+                      deviceId);
+            return;
+        }
+        log.debug("Opening session for {}...", deviceId);
+        sendMasterArbitrationUpdate(controller.newMasterElectionId(deviceId));
+
+    }
+
+    @Override
+    public boolean isSessionOpen() {
+        return streamChannelManager.isOpen();
+    }
+
+    @Override
+    public void closeSession() {
+        streamChannelManager.complete();
+    }
+
+    @Override
+    public void runForMastership() {
+        if (!isSessionOpen()) {
+            log.debug("Dropping mastership request for {}, session is closed",
+                      deviceId);
+            return;
+        }
+        // Becoming master is a race. Here we increase our chances of win, i.e.
+        // using the highest election ID, against other ONOS nodes in the
+        // cluster that are calling openSession() (which is used to start the
+        // stream RPC session, not to become master).
+        log.debug("Running for mastership on {}...", deviceId);
+        final BigInteger masterId = controller.newMasterElectionId(deviceId)
+                .add(ONE_THOUSAND);
+        sendMasterArbitrationUpdate(masterId);
+    }
+
+    @Override
+    public boolean isMaster() {
+        return streamChannelManager.isOpen() && isClientMaster.get();
+    }
+
+    @Override
+    public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
+        if (!isSessionOpen()) {
+            log.debug("Dropping packet-out request for {}, session is closed",
+                      deviceId);
+            return;
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("Sending packet-out to {}: {}", deviceId, packet);
+        }
+        try {
+            // Encode the PiPacketOperation into a PacketOut
+            final P4RuntimeOuterClass.PacketOut packetOut =
+                    CODECS.packetOut().encode(packet, null, pipeconf);
+            // Build the request
+            final StreamMessageRequest packetOutRequest = StreamMessageRequest
+                    .newBuilder().setPacket(packetOut).build();
+            // Send.
+            streamChannelManager.sendIfOpen(packetOutRequest);
+        } catch (CodecException e) {
+            log.error("Unable to send packet-out: {}", e.getMessage());
+        }
+    }
+
+    private void sendMasterArbitrationUpdate(BigInteger electionId) {
+        log.debug("Sending arbitration update to {}... electionId={}",
+                  deviceId, electionId);
+        final P4RuntimeOuterClass.Uint128 idMsg = bigIntegerToUint128(electionId);
+        streamChannelManager.send(
+                StreamMessageRequest.newBuilder()
+                        .setArbitration(
+                                P4RuntimeOuterClass.MasterArbitrationUpdate
+                                        .newBuilder()
+                                        .setDeviceId(p4DeviceId)
+                                        .setElectionId(idMsg)
+                                        .build())
+                        .build());
+        lastUsedElectionId = idMsg;
+    }
+
+    private P4RuntimeOuterClass.Uint128 bigIntegerToUint128(BigInteger value) {
+        final byte[] arr = value.toByteArray();
+        final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
+                .put(new byte[Long.BYTES * 2 - arr.length])
+                .put(arr);
+        bb.rewind();
+        return P4RuntimeOuterClass.Uint128.newBuilder()
+                .setHigh(bb.getLong())
+                .setLow(bb.getLong())
+                .build();
+    }
+
+    private BigInteger uint128ToBigInteger(P4RuntimeOuterClass.Uint128 value) {
+        return new BigInteger(
+                ByteBuffer.allocate(Long.BYTES * 2)
+                        .putLong(value.getHigh())
+                        .putLong(value.getLow())
+                        .array());
+    }
+
+    private void handlePacketIn(P4RuntimeOuterClass.PacketIn packetInMsg) {
+        if (log.isTraceEnabled()) {
+            log.trace("Received packet-in from {}: {}", deviceId, packetInMsg);
+        }
+        if (!pipeconfService.getPipeconf(deviceId).isPresent()) {
+            log.warn("Unable to handle packet-in from {}, missing pipeconf: {}",
+                     deviceId, TextFormat.shortDebugString(packetInMsg));
+            return;
+        }
+        // Decode packet message and post event.
+        // TODO: consider implementing a cache to speed up
+        //  encoding/deconding of packet-in/out (e.g. LLDP, ARP)
+        final PiPipeconf pipeconf = pipeconfService.getPipeconf(deviceId).get();
+        final PiPacketOperation pktOperation;
+        try {
+            pktOperation = CODECS.packetIn().decode(
+                    packetInMsg, null, pipeconf);
+        } catch (CodecException e) {
+            log.warn("Unable to process packet-int: {}", e.getMessage());
+            return;
+        }
+        controller.postEvent(new P4RuntimeEvent(
+                P4RuntimeEvent.Type.PACKET_IN,
+                new PacketInEvent(deviceId, pktOperation)));
+    }
+
+    private void handleArbitrationUpdate(P4RuntimeOuterClass.MasterArbitrationUpdate msg) {
+        // From the spec...
+        // - Election_id: The stream RPC with the highest election_id is the
+        // master. Switch populates with the highest election ID it
+        // has received from all connected controllers.
+        // - Status: Switch populates this with OK for the client that is the
+        // master, and with an error status for all other connected clients (at
+        // every mastership change).
+        if (!msg.hasElectionId() || !msg.hasStatus()) {
+            return;
+        }
+        final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
+        log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
+                  deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
+        controller.postEvent(new P4RuntimeEvent(
+                P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
+                new ArbitrationUpdateEvent(deviceId, isMaster)));
+        isClientMaster.set(isMaster);
+    }
+
+    /**
+     * Returns the election ID last used in a MasterArbitrationUpdate message
+     * sent by the client to the server.
+     *
+     * @return election ID uint128 protobuf message
+     */
+    P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
+        return lastUsedElectionId;
+    }
+
+    /**
+     * A manager for the P4Runtime stream channel that opportunistically creates
+     * new stream RCP stubs (e.g. when one fails because of errors) and posts
+     * channel events via the P4Runtime controller.
+     */
+    private final class StreamChannelManager {
+
+        private final AtomicBoolean open = new AtomicBoolean(false);
+        private final StreamObserver<StreamMessageResponse> responseObserver =
+                new InternalStreamResponseObserver(this);
+        private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
+
+        void send(StreamMessageRequest value) {
+            synchronized (this) {
+                initIfRequired();
+                doSend(value);
+            }
+        }
+
+        void sendIfOpen(StreamMessageRequest value) {
+            // We do not lock here, but we ignore NPEs due to stream RPC not
+            // being active (null requestObserver). Good for frequent
+            // packet-outs.
+            try {
+                doSend(value);
+            } catch (NullPointerException e) {
+                if (requestObserver != null) {
+                    // Must be something else.
+                    throw e;
+                }
+            }
+        }
+
+        private void doSend(StreamMessageRequest value) {
+            try {
+                requestObserver.onNext(value);
+            } catch (Throwable ex) {
+                if (ex instanceof StatusRuntimeException) {
+                    log.warn("Unable to send {} to {}: {}",
+                             value.getUpdateCase().toString(), deviceId, ex.getMessage());
+                } else {
+                    log.error("Exception while sending {} to {}: {}",
+                              value.getUpdateCase().toString(), deviceId, ex);
+                }
+                complete();
+            }
+        }
+
+        private void initIfRequired() {
+            if (requestObserver == null) {
+                log.debug("Creating new stream channel for {}...", deviceId);
+                open.set(false);
+                client.execRpcNoTimeout(
+                        s -> requestObserver =
+                                (ClientCallStreamObserver<StreamMessageRequest>)
+                                        s.streamChannel(responseObserver)
+                );
+            }
+        }
+
+        void complete() {
+            synchronized (this) {
+                signalClosed();
+                if (requestObserver != null) {
+                    requestObserver.onCompleted();
+                    requestObserver.cancel("Completed", null);
+                    requestObserver = null;
+                }
+            }
+        }
+
+        void signalOpen() {
+            synchronized (this) {
+                final boolean wasOpen = open.getAndSet(true);
+                if (!wasOpen) {
+                    controller.postEvent(new P4RuntimeEvent(
+                            P4RuntimeEvent.Type.CHANNEL_EVENT,
+                            new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
+                }
+            }
+        }
+
+        void signalClosed() {
+            synchronized (this) {
+                final boolean wasOpen = open.getAndSet(false);
+                if (wasOpen) {
+                    controller.postEvent(new P4RuntimeEvent(
+                            P4RuntimeEvent.Type.CHANNEL_EVENT,
+                            new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
+                }
+            }
+        }
+
+        boolean isOpen() {
+            return open.get();
+        }
+    }
+
+    /**
+     * Handles messages received from the device on the stream channel.
+     */
+    private final class InternalStreamResponseObserver
+            implements StreamObserver<StreamMessageResponse> {
+
+        private final StreamChannelManager streamChannelManager;
+
+        private InternalStreamResponseObserver(
+                StreamChannelManager streamChannelManager) {
+            this.streamChannelManager = streamChannelManager;
+        }
+
+        @Override
+        public void onNext(StreamMessageResponse message) {
+            streamChannelManager.signalOpen();
+            try {
+                if (log.isTraceEnabled()) {
+                    log.trace(
+                            "Received {} from {}: {}",
+                            message.getUpdateCase(), deviceId,
+                            TextFormat.shortDebugString(message));
+                }
+                switch (message.getUpdateCase()) {
+                    case PACKET:
+                        handlePacketIn(message.getPacket());
+                        return;
+                    case ARBITRATION:
+                        handleArbitrationUpdate(message.getArbitration());
+                        return;
+                    default:
+                        log.warn("Unrecognized StreamMessageResponse from {}: {}",
+                                 deviceId, message.getUpdateCase());
+                }
+            } catch (Throwable ex) {
+                log.error("Exception while processing stream message from {}",
+                          deviceId, ex);
+            }
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            if (throwable instanceof StatusRuntimeException) {
+                final StatusRuntimeException sre = (StatusRuntimeException) throwable;
+                if (sre.getStatus().getCause() instanceof ConnectException) {
+                    log.warn("{} is unreachable ({})",
+                             deviceId, sre.getCause().getMessage());
+                } else {
+                    log.warn("Error on stream channel for {}: {}",
+                             deviceId, throwable.getMessage());
+                }
+            } else {
+                log.error(format("Exception on stream channel for %s",
+                                 deviceId), throwable);
+            }
+            streamChannelManager.complete();
+        }
+
+        @Override
+        public void onCompleted() {
+            log.warn("Stream channel for {} has completed", deviceId);
+            streamChannelManager.complete();
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
new file mode 100644
index 0000000..5b5d087
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.client;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.protobuf.TextFormat;
+import io.grpc.stub.StreamObserver;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.runtime.PiEntity;
+import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.ctl.codec.CodecException;
+import org.slf4j.Logger;
+import p4.v1.P4RuntimeOuterClass;
+
+import java.util.concurrent.CompletableFuture;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.SHORT_TIMEOUT_SECONDS;
+import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles the creation of P4Runtime WriteRequest and execution of the Write RPC
+ * on the server.
+ */
+final class WriteRequestImpl implements P4RuntimeWriteClient.WriteRequest {
+
+    private static final Logger log = getLogger(WriteRequestImpl.class);
+
+    private static final P4RuntimeOuterClass.WriteResponse P4RT_DEFAULT_WRITE_RESPONSE_MSG =
+            P4RuntimeOuterClass.WriteResponse.getDefaultInstance();
+
+    private final P4RuntimeClientImpl client;
+    private final PiPipeconf pipeconf;
+    // The P4Runtime WriteRequest protobuf message we need to populate.
+    private final P4RuntimeOuterClass.WriteRequest.Builder requestMsg;
+    // WriteResponse instance builder. We populate entity responses as we add new
+    // entities to this request. The status of each entity response will be
+    // set once we receive a response from the device.
+    private final WriteResponseImpl.Builder responseBuilder;
+
+    WriteRequestImpl(P4RuntimeClientImpl client, PiPipeconf pipeconf) {
+        this.client = checkNotNull(client);
+        this.pipeconf = checkNotNull(pipeconf);
+        this.requestMsg = P4RuntimeOuterClass.WriteRequest.newBuilder()
+                .setDeviceId(client.p4DeviceId());
+        this.responseBuilder = WriteResponseImpl.builder(client.deviceId());
+    }
+
+    @Override
+    public P4RuntimeWriteClient.WriteRequest withAtomicity(
+            P4RuntimeWriteClient.Atomicity atomicity) {
+        checkNotNull(atomicity);
+        switch (atomicity) {
+            case CONTINUE_ON_ERROR:
+                requestMsg.setAtomicity(
+                        P4RuntimeOuterClass.WriteRequest.Atomicity.CONTINUE_ON_ERROR);
+                break;
+            case ROLLBACK_ON_ERROR:
+            case DATAPLANE_ATOMIC:
+                // Supporting this while allowing codec exceptions to be
+                // reported as write responses can be tricky. Assuming write on
+                // device succeed but we have a codec exception and
+                // atomicity is rollback on error.
+            default:
+                throw new UnsupportedOperationException(format(
+                        "Atomicity mode %s not supported", atomicity));
+        }
+        return this;
+    }
+
+    @Override
+    public P4RuntimeWriteClient.WriteRequest insert(PiEntity entity) {
+        return entity(entity, P4RuntimeWriteClient.UpdateType.INSERT);
+    }
+
+    @Override
+    public P4RuntimeWriteClient.WriteRequest insert(
+            Iterable<? extends PiEntity> entities) {
+        return entities(entities, P4RuntimeWriteClient.UpdateType.INSERT);
+    }
+
+    @Override
+    public P4RuntimeWriteClient.WriteRequest modify(PiEntity entity) {
+        return entity(entity, P4RuntimeWriteClient.UpdateType.MODIFY);
+    }
+
+    @Override
+    public P4RuntimeWriteClient.WriteRequest modify(
+            Iterable<? extends PiEntity> entities) {
+        return entities(entities, P4RuntimeWriteClient.UpdateType.MODIFY);
+    }
+
+    @Override
+    public P4RuntimeWriteClient.WriteRequest delete(
+            Iterable<? extends PiHandle> handles) {
+        checkNotNull(handles);
+        handles.forEach(this::delete);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeWriteClient.WriteRequest entities(
+            Iterable<? extends PiEntity> entities,
+            P4RuntimeWriteClient.UpdateType updateType) {
+        checkNotNull(entities);
+        entities.forEach(e -> this.entity(e, updateType));
+        return this;
+    }
+
+    @Override
+    public P4RuntimeWriteClient.WriteRequest entity(
+            PiEntity entity, P4RuntimeWriteClient.UpdateType updateType) {
+        checkNotNull(entity);
+        checkNotNull(updateType);
+        appendToRequestMsg(updateType, entity, entity.handle(client.deviceId()));
+        return this;
+    }
+
+    @Override
+    public P4RuntimeWriteClient.WriteRequest delete(PiHandle handle) {
+        checkNotNull(handle);
+        appendToRequestMsg(P4RuntimeWriteClient.UpdateType.DELETE, null, handle);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeWriteClient.WriteResponse submitSync() {
+        return Futures.getUnchecked(submit());
+    }
+
+    @Override
+    public CompletableFuture<P4RuntimeWriteClient.WriteResponse> submit() {
+        final P4RuntimeOuterClass.WriteRequest writeRequest = requestMsg
+                .setElectionId(client.lastUsedElectionId())
+                .build();
+        log.debug("Sending write request to {} with {} updates...",
+                  client.deviceId(), writeRequest.getUpdatesCount());
+        if (writeRequest.getUpdatesCount() == 0) {
+            // No need to ask the server.
+            return completedFuture(WriteResponseImpl.EMPTY);
+        }
+        final CompletableFuture<P4RuntimeWriteClient.WriteResponse> future =
+                new CompletableFuture<>();
+        final StreamObserver<P4RuntimeOuterClass.WriteResponse> observer =
+                new StreamObserver<P4RuntimeOuterClass.WriteResponse>() {
+                    @Override
+                    public void onNext(P4RuntimeOuterClass.WriteResponse value) {
+                        if (!P4RT_DEFAULT_WRITE_RESPONSE_MSG.equals(value)) {
+                            log.warn("Received invalid WriteResponse message from {}: {}",
+                                     client.deviceId(), TextFormat.shortDebugString(value));
+                            // Leave all entity responses in pending state.
+                            future.complete(responseBuilder.buildAsIs());
+                        } else {
+                            log.debug("Received write response from {}...",
+                                      client.deviceId());
+                            // All good, all entities written successfully.
+                            future.complete(responseBuilder.setSuccessAllAndBuild());
+                        }
+                    }
+                    @Override
+                    public void onError(Throwable t) {
+                        client.handleRpcError(t, "WRITE");
+                        future.complete(responseBuilder.setErrorsAndBuild(t));
+                    }
+                    @Override
+                    public void onCompleted() {
+                        // Nothing to do, unary call.
+                    }
+                };
+        client.execRpc(s -> s.write(writeRequest, observer), SHORT_TIMEOUT_SECONDS);
+        return future;
+    }
+
+    private void appendToRequestMsg(P4RuntimeWriteClient.UpdateType updateType,
+                                    PiEntity piEntity, PiHandle handle) {
+        final P4RuntimeOuterClass.Update.Type p4UpdateType;
+        final P4RuntimeOuterClass.Entity entityMsg;
+        try {
+            if (updateType.equals(P4RuntimeWriteClient.UpdateType.DELETE)) {
+                p4UpdateType = P4RuntimeOuterClass.Update.Type.DELETE;
+                entityMsg = CODECS.handle().encode(handle, null, pipeconf);
+            } else {
+                p4UpdateType = updateType == P4RuntimeWriteClient.UpdateType.INSERT
+                        ? P4RuntimeOuterClass.Update.Type.INSERT
+                        : P4RuntimeOuterClass.Update.Type.MODIFY;
+                entityMsg = CODECS.entity().encode(piEntity, null, pipeconf);
+            }
+            final P4RuntimeOuterClass.Update updateMsg = P4RuntimeOuterClass.Update
+                    .newBuilder()
+                    .setEntity(entityMsg)
+                    .setType(p4UpdateType)
+                    .build();
+            requestMsg.addUpdates(updateMsg);
+            responseBuilder.addPendingResponse(handle, piEntity, updateType);
+        } catch (CodecException e) {
+            responseBuilder.addFailedResponse(
+                    handle, piEntity, updateType, e.getMessage(),
+                    P4RuntimeWriteClient.WriteResponseStatus.CODEC_ERROR);
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
new file mode 100644
index 0000000..404ab80
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
@@ -0,0 +1,394 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.client;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.TextFormat;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.protobuf.lite.ProtoLiteUtils;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.runtime.PiEntity;
+import org.onosproject.net.pi.runtime.PiEntityType;
+import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteEntityResponse;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponseStatus;
+import org.slf4j.Logger;
+import p4.v1.P4RuntimeOuterClass;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles the creation of WriteResponse and parsing of P4Runtime errors
+ * received from server, as well as logging of RPC errors.
+ */
+final class WriteResponseImpl implements P4RuntimeWriteClient.WriteResponse {
+
+    private static final Metadata.Key<com.google.rpc.Status> STATUS_DETAILS_KEY =
+            Metadata.Key.of(
+                    "grpc-status-details-bin",
+                    ProtoLiteUtils.metadataMarshaller(
+                            com.google.rpc.Status.getDefaultInstance()));
+
+    static final WriteResponseImpl EMPTY = new WriteResponseImpl(
+            ImmutableList.of(), ImmutableListMultimap.of());
+
+    private static final Logger log = getLogger(WriteResponseImpl.class);
+
+    private final ImmutableList<WriteEntityResponse> entityResponses;
+    private final ImmutableListMultimap<WriteResponseStatus, WriteEntityResponse> statusMultimap;
+
+    private WriteResponseImpl(
+            ImmutableList<WriteEntityResponse> allResponses,
+            ImmutableListMultimap<WriteResponseStatus, WriteEntityResponse> statusMultimap) {
+        this.entityResponses = allResponses;
+        this.statusMultimap = statusMultimap;
+    }
+
+    @Override
+    public boolean isSuccess() {
+        return success().size() == all().size();
+    }
+
+    @Override
+    public Collection<WriteEntityResponse> all() {
+        return entityResponses;
+    }
+
+    @Override
+    public Collection<WriteEntityResponse> success() {
+        return statusMultimap.get(WriteResponseStatus.OK);
+    }
+
+    @Override
+    public Collection<WriteEntityResponse> failed() {
+        return isSuccess()
+                ? Collections.emptyList()
+                : entityResponses.stream().filter(r -> !r.isSuccess()).collect(toList());
+    }
+
+    @Override
+    public Collection<WriteEntityResponse> status(
+            WriteResponseStatus status) {
+        checkNotNull(status);
+        return statusMultimap.get(status);
+    }
+
+    /**
+     * Returns a new response builder for the given device.
+     *
+     * @param deviceId device ID
+     * @return response builder
+     */
+    static Builder builder(DeviceId deviceId) {
+        return new Builder(deviceId);
+    }
+
+    /**
+     * Builder of P4RuntimeWriteResponseImpl.
+     */
+    static final class Builder {
+
+        private final DeviceId deviceId;
+        private final Map<Integer, WriteEntityResponseImpl> pendingResponses =
+                Maps.newHashMap();
+        private final List<WriteEntityResponse> allResponses =
+                Lists.newArrayList();
+        private final ListMultimap<WriteResponseStatus, WriteEntityResponse> statusMap =
+                ArrayListMultimap.create();
+
+        private Builder(DeviceId deviceId) {
+            this.deviceId = deviceId;
+        }
+
+        void addPendingResponse(PiHandle handle, PiEntity entity, UpdateType updateType) {
+            synchronized (this) {
+                final WriteEntityResponseImpl resp = new WriteEntityResponseImpl(
+                        handle, entity, updateType);
+                allResponses.add(resp);
+                pendingResponses.put(pendingResponses.size(), resp);
+            }
+        }
+
+        void addFailedResponse(PiHandle handle, PiEntity entity, UpdateType updateType,
+                               String explanation, WriteResponseStatus status) {
+            synchronized (this) {
+                final WriteEntityResponseImpl resp = new WriteEntityResponseImpl(
+                        handle, entity, updateType)
+                        .withFailure(explanation, status);
+                allResponses.add(resp);
+            }
+        }
+
+        WriteResponseImpl buildAsIs() {
+            synchronized (this) {
+                if (!pendingResponses.isEmpty()) {
+                    log.warn("Detected partial response from {}, " +
+                                     "{} of {} total entities are in status PENDING",
+                             deviceId, pendingResponses.size(), allResponses.size());
+                }
+                return new WriteResponseImpl(
+                        ImmutableList.copyOf(allResponses),
+                        ImmutableListMultimap.copyOf(statusMap));
+            }
+        }
+
+        WriteResponseImpl setSuccessAllAndBuild() {
+            synchronized (this) {
+                pendingResponses.values().forEach(this::doSetSuccess);
+                pendingResponses.clear();
+                return buildAsIs();
+            }
+        }
+
+        WriteResponseImpl setErrorsAndBuild(Throwable throwable) {
+            synchronized (this) {
+                return doSetErrorsAndBuild(throwable);
+            }
+        }
+
+        private void setSuccess(int index) {
+            synchronized (this) {
+                final WriteEntityResponseImpl resp = pendingResponses.remove(index);
+                if (resp != null) {
+                    doSetSuccess(resp);
+                } else {
+                    log.error("Missing pending response at index {}", index);
+                }
+            }
+        }
+
+        private void doSetSuccess(WriteEntityResponseImpl resp) {
+            resp.setSuccess();
+            statusMap.put(WriteResponseStatus.OK, resp);
+        }
+
+        private void setFailure(int index,
+                                String explanation,
+                                WriteResponseStatus status) {
+            synchronized (this) {
+                final WriteEntityResponseImpl resp = pendingResponses.remove(index);
+                if (resp != null) {
+                    resp.withFailure(explanation, status);
+                    statusMap.put(status, resp);
+                    log.warn("Unable to {} {} on {}: {} {} [{}]",
+                             resp.updateType(),
+                             resp.entityType().humanReadableName(),
+                             deviceId,
+                             status, explanation,
+                             resp.entity() != null ? resp.entity() : resp.handle());
+                } else {
+                    log.error("Missing pending response at index {}", index);
+                }
+            }
+        }
+
+        private WriteResponseImpl doSetErrorsAndBuild(Throwable throwable) {
+            if (!(throwable instanceof StatusRuntimeException)) {
+                // Leave all entity responses in pending state.
+                return buildAsIs();
+            }
+            final StatusRuntimeException sre = (StatusRuntimeException) throwable;
+            if (!sre.getStatus().equals(Status.UNKNOWN)) {
+                // Error trailers expected only if status is UNKNOWN.
+                return buildAsIs();
+            }
+            // Extract error details.
+            if (!sre.getTrailers().containsKey(STATUS_DETAILS_KEY)) {
+                log.warn("Cannot parse write error details from {}, " +
+                                 "missing status trailers in StatusRuntimeException",
+                         deviceId);
+                return buildAsIs();
+            }
+            com.google.rpc.Status status = sre.getTrailers().get(STATUS_DETAILS_KEY);
+            if (status == null) {
+                log.warn("Cannot parse write error details from {}, " +
+                                 "found NULL status trailers in StatusRuntimeException",
+                         deviceId);
+                return buildAsIs();
+            }
+            final boolean reconcilable = status.getDetailsList().size() == pendingResponses.size();
+            // We expect one error for each entity...
+            if (!reconcilable) {
+                log.warn("Unable to reconcile write error details from {}, " +
+                                 "sent {} updates, but server returned {} errors",
+                         deviceId, pendingResponses.size(), status.getDetailsList().size());
+            }
+            // ...in the same order as in the request.
+            int index = 0;
+            for (Any any : status.getDetailsList()) {
+                // Set response entities only if reconcilable, otherwise log.
+                unpackP4Error(index, any, reconcilable);
+                index += 1;
+            }
+            return buildAsIs();
+        }
+
+        private void unpackP4Error(int index, Any any, boolean reconcilable) {
+            final P4RuntimeOuterClass.Error p4Error;
+            try {
+                p4Error = any.unpack(P4RuntimeOuterClass.Error.class);
+            } catch (InvalidProtocolBufferException e) {
+                final String unpackErr = format(
+                        "P4Runtime Error message format not recognized [%s]",
+                        TextFormat.shortDebugString(any));
+                if (reconcilable) {
+                    setFailure(index, unpackErr, WriteResponseStatus.OTHER_ERROR);
+                } else {
+                    log.warn(unpackErr);
+                }
+                return;
+            }
+            // Map gRPC status codes to our WriteResponseStatus codes.
+            final Status.Code p4Code = Status.fromCodeValue(
+                    p4Error.getCanonicalCode()).getCode();
+            final WriteResponseStatus ourCode;
+            switch (p4Code) {
+                case OK:
+                    if (reconcilable) {
+                        setSuccess(index);
+                    }
+                    return;
+                case NOT_FOUND:
+                    ourCode = WriteResponseStatus.NOT_FOUND;
+                    break;
+                case ALREADY_EXISTS:
+                    ourCode = WriteResponseStatus.ALREADY_EXIST;
+                    break;
+                default:
+                    ourCode = WriteResponseStatus.OTHER_ERROR;
+                    break;
+            }
+            // Put the p4Code in the explanation only if ourCode is OTHER_ERROR.
+            final String explanationCode = ourCode == WriteResponseStatus.OTHER_ERROR
+                    ? p4Code.name() + " " : "";
+            final String details = p4Error.hasDetails()
+                    ? ", " + p4Error.getDetails().toString() : "";
+            final String explanation = format(
+                    "%s%s%s (%s:%d)", explanationCode, p4Error.getMessage(),
+                    details, p4Error.getSpace(), p4Error.getCode());
+            if (reconcilable) {
+                setFailure(index, explanation, ourCode);
+            } else {
+                log.warn("P4Runtime write error: {}", explanation);
+            }
+        }
+    }
+
+    /**
+     * Internal implementation of WriteEntityResponse.
+     */
+    private static final class WriteEntityResponseImpl implements WriteEntityResponse {
+
+        private final PiHandle handle;
+        private final PiEntity entity;
+        private final UpdateType updateType;
+
+        private WriteResponseStatus status = WriteResponseStatus.PENDING;
+        private String explanation;
+        private Throwable throwable;
+
+        private WriteEntityResponseImpl(PiHandle handle, PiEntity entity, UpdateType updateType) {
+            this.handle = handle;
+            this.entity = entity;
+            this.updateType = updateType;
+        }
+
+        private WriteEntityResponseImpl withFailure(
+                String explanation, WriteResponseStatus status) {
+            this.status = status;
+            this.explanation = explanation;
+            this.throwable = null;
+            return this;
+        }
+
+        private void setSuccess() {
+            this.status = WriteResponseStatus.OK;
+        }
+
+        @Override
+        public PiHandle handle() {
+            return handle;
+        }
+
+        @Override
+        public PiEntity entity() {
+            return entity;
+        }
+
+        @Override
+        public UpdateType updateType() {
+            return updateType;
+        }
+
+        @Override
+        public PiEntityType entityType() {
+            return handle.entityType();
+        }
+
+        @Override
+        public boolean isSuccess() {
+            return status().equals(WriteResponseStatus.OK);
+        }
+
+        @Override
+        public WriteResponseStatus status() {
+            return status;
+        }
+
+        @Override
+        public String explanation() {
+            return explanation;
+        }
+
+        @Override
+        public Throwable throwable() {
+            return throwable;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .add("handle", handle)
+                    .add("entity", entity)
+                    .add("updateType", updateType)
+                    .add("status", status)
+                    .add("explanation", explanation)
+                    .add("throwable", throwable)
+                    .toString();
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/package-info.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/package-info.java
new file mode 100644
index 0000000..a5614a3
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * P4Runtime client implementation classes.
+ */
+package org.onosproject.p4runtime.ctl.client;