Handling of table entry messages in P4Runtime

+ synchonized method execution in P4RuntimeClient
+ support for cancellable contexts (for client shutdown)
+ logging of sent/received messages in GrpcControllerImpl
+ minor refactorings

Change-Id: I43f0fcc263579e01957a02ef3392105aed476f33
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/EncodeException.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/EncodeException.java
new file mode 100644
index 0000000..4601f32
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/EncodeException.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+/**
+ * Signals that the proto message cannot be build.
+ */
+final class EncodeException extends Exception {
+
+    EncodeException(String explanation) {
+        super(explanation);
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4InfoBrowser.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4InfoBrowser.java
index d8edaff..3c27468 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4InfoBrowser.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4InfoBrowser.java
@@ -31,8 +31,11 @@
 import p4.config.P4InfoOuterClass.Preamble;
 import p4.config.P4InfoOuterClass.Table;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 
 /**
@@ -70,7 +73,10 @@
                     String tableName = entity.getPreamble().getName();
                     EntityBrowser<MatchField> matchFieldBrowser = new EntityBrowser<>(format(
                             "match field for table '%s'", tableName));
-                    entity.getMatchFieldsList().forEach(m -> matchFieldBrowser.add(m.getName(), m.getId(), m));
+                    entity.getMatchFieldsList().forEach(m -> {
+                        String alias = extractMatchFieldSimpleName(m.getName());
+                        matchFieldBrowser.add(m.getName(), alias, m.getId(), m);
+                    });
                     matchFields.put(tableId, matchFieldBrowser);
                 });
 
@@ -82,7 +88,7 @@
                     String actionName = entity.getPreamble().getName();
                     EntityBrowser<Action.Param> paramBrowser = new EntityBrowser<>(format(
                             "param for action '%s'", actionName));
-                    entity.getParamsList().forEach(p -> paramBrowser.add(p.getName(), p.getId(), p));
+                    entity.getParamsList().forEach(p -> paramBrowser.add(p.getName(), null, p.getId(), p));
                     actionParams.put(actionId, paramBrowser);
                 });
 
@@ -105,6 +111,19 @@
                 entity -> ctrlPktMetadatas.addWithPreamble(entity.getPreamble(), entity));
     }
 
+    private String extractMatchFieldSimpleName(String name) {
+        // Removes the leading "hdr." or other scope identifier.
+        // E.g.: "hdr.ethernet.etherType" becomes "ethernet.etherType"
+        String[] pieces = name.split("\\.");
+        if (pieces.length == 3) {
+            return pieces[1] + "." + pieces[2];
+        } else if (pieces.length == 2) {
+            return name;
+        } else {
+            throw new UnsupportedOperationException("Invalid match field name: " + name);
+        }
+    }
+
     /**
      * Returns a browser for tables.
      *
@@ -220,15 +239,22 @@
         }
 
         /**
-         * Adds the given entity identified by the given name and id.
+         * Adds the given entity identified by the given name, alias and id.
          *
          * @param name   entity name
+         * @param alias  entity alias
          * @param id     entity id
          * @param entity entity message
          */
-        void add(String name, int id, T entity) {
+        void add(String name, @Nullable String alias, int id, T entity) {
+            checkNotNull(name);
+            checkArgument(!name.isEmpty(), "Name cannot be empty");
+            checkNotNull(entity);
             names.put(name, entity);
             ids.put(id, entity);
+            if (alias != null && !alias.isEmpty()) {
+                aliases.put(alias, entity);
+            }
         }
 
         /**
@@ -238,9 +264,8 @@
          * @param entity   entity message
          */
         void addWithPreamble(Preamble preamble, T entity) {
-            names.put(preamble.getName(), entity);
-            aliases.put(preamble.getName(), entity);
-            ids.put(preamble.getId(), entity);
+            checkNotNull(preamble);
+            add(preamble.getName(), preamble.getAlias(), preamble.getId(), entity);
         }
 
         /**
@@ -268,6 +293,23 @@
         }
 
         /**
+         * Returns the entity identified by the given name or alias, if present, otherwise, throws an exception.
+         *
+         * @param name entity name or alias
+         * @return entity message
+         * @throws NotFoundException if the entity cannot be found
+         */
+        T getByNameOrAlias(String name) throws NotFoundException {
+            if (hasName(name)) {
+                return names.get(name);
+            } else if (hasAlias(name)) {
+                return aliases.get(name);
+            } else {
+                throw new NotFoundException(entityName, name);
+            }
+        }
+
+        /**
          * Returns true if the P4Info defines an entity with such alias, false otherwise.
          *
          * @param alias entity alias
@@ -319,7 +361,7 @@
     /**
      * Signals tha an entity cannot be found in the P4Info.
      */
-    public static class NotFoundException extends Exception {
+    public static final class NotFoundException extends Exception {
 
         NotFoundException(String entityName, String key) {
             super(format("No such %s in P4Info with name/alias '%s'", entityName, key));
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 3762eb1..132689a 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -17,9 +17,8 @@
 package org.onosproject.p4runtime.ctl;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.TextFormat;
 import io.grpc.Context;
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
@@ -35,165 +34,130 @@
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.slf4j.Logger;
 import p4.P4RuntimeGrpc;
+import p4.P4RuntimeOuterClass.Entity;
+import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
+import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
+import p4.P4RuntimeOuterClass.PacketIn;
+import p4.P4RuntimeOuterClass.ReadRequest;
+import p4.P4RuntimeOuterClass.ReadResponse;
+import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
+import p4.P4RuntimeOuterClass.StreamMessageRequest;
+import p4.P4RuntimeOuterClass.StreamMessageResponse;
+import p4.P4RuntimeOuterClass.TableEntry;
+import p4.P4RuntimeOuterClass.Uint128;
+import p4.P4RuntimeOuterClass.Update;
+import p4.P4RuntimeOuterClass.WriteRequest;
 import p4.tmp.P4Config;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.onlab.util.ImmutableByteSequence.copyFrom;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType;
 import static org.slf4j.LoggerFactory.getLogger;
-import static p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
-import static p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
-import static p4.P4RuntimeOuterClass.PacketIn;
-import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
+import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
 import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
-import static p4.P4RuntimeOuterClass.StreamMessageRequest;
-import static p4.P4RuntimeOuterClass.StreamMessageResponse;
 import static p4.config.P4InfoOuterClass.P4Info;
 
 /**
  * Implementation of a P4Runtime client.
  */
-public class P4RuntimeClientImpl implements P4RuntimeClient {
+public final class P4RuntimeClientImpl implements P4RuntimeClient {
 
     private static final int DEADLINE_SECONDS = 15;
 
+    // FIXME: use static election ID, since mastership arbitration is not yet support on BMv2 or Tofino.
+    private static final int ELECTION_ID = 1;
+
+    private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
+            WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
+            WriteOperationType.INSERT, Update.Type.INSERT,
+            WriteOperationType.MODIFY, Update.Type.MODIFY,
+            WriteOperationType.DELETE, Update.Type.DELETE
+    );
+
     private final Logger log = getLogger(getClass());
 
     private final DeviceId deviceId;
     private final int p4DeviceId;
     private final P4RuntimeControllerImpl controller;
     private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
-    private final P4RuntimeGrpc.P4RuntimeStub asyncStub;
-    private ExecutorService executorService;
-    private StreamObserver<StreamMessageRequest> streamRequestObserver;
-    private Context.CancellableContext streamContext;
+    private final Context.CancellableContext cancellableContext;
+    private final ExecutorService executorService;
+    private final Executor contextExecutor;
+    private final Lock writeLock = new ReentrantLock();
+    private final StreamObserver<StreamMessageRequest> streamRequestObserver;
 
 
-    P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller,
-                        ExecutorService executorService) {
+    P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller) {
         this.deviceId = deviceId;
         this.p4DeviceId = p4DeviceId;
         this.controller = controller;
-        this.executorService = executorService;
+        this.cancellableContext = Context.current().withCancellation();
+        this.executorService = Executors.newFixedThreadPool(5, groupedThreads(
+                "onos/p4runtime-client-" + deviceId.toString(),
+                deviceId.toString() + "-%d"));
+        this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
         this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
                 .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
-        this.asyncStub = P4RuntimeGrpc.newStub(channel);
+        P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
+        this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
+    }
+
+    /**
+     * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
+     * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
+     * <p>
+     * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
+     * <p>
+     */
+    private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier) {
+        return CompletableFuture.supplyAsync(() -> {
+            // TODO: explore a more relaxed locking strategy.
+            writeLock.lock();
+            try {
+                return supplier.get();
+            } finally {
+                writeLock.unlock();
+            }
+        }, contextExecutor);
     }
 
     @Override
     public CompletableFuture<Boolean> initStreamChannel() {
-        return CompletableFuture.supplyAsync(this::doInitStreamChannel, executorService);
-    }
-
-    private boolean doInitStreamChannel() {
-        if (this.streamRequestObserver == null) {
-
-            streamContext = Context.current().withCancellation();
-            streamContext.run(
-                    () -> streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver()));
-
-            // To listen for packets and other events, we need to start the RPC.
-            // Here we do it by sending a master arbitration update.
-            if (!doArbitrationUpdate()) {
-                log.warn("Unable to initialize stream channel for {}", deviceId);
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private boolean doArbitrationUpdate() {
-
-        if (streamRequestObserver == null) {
-            log.error("Null request stream observer for {}", deviceId);
-            return false;
-        }
-
-        try {
-            StreamMessageRequest initRequest = StreamMessageRequest
-                    .newBuilder()
-                    .setArbitration(MasterArbitrationUpdate
-                            .newBuilder()
-                            .setDeviceId(p4DeviceId)
-                            .build())
-                    .build();
-            streamRequestObserver.onNext(initRequest);
-            return true;
-        } catch (StatusRuntimeException e) {
-            log.warn("Arbitration update failed for {}: {}", deviceId, e);
-            return false;
-        }
+        return supplyInContext(this::doInitStreamChannel);
     }
 
     @Override
-    public CompletableFuture<Boolean> setPipelineConfig(InputStream p4info, InputStream targetConfig) {
-        return CompletableFuture.supplyAsync(() -> doSetPipelineConfig(p4info, targetConfig), executorService);
-    }
-
-    private boolean doSetPipelineConfig(InputStream p4info, InputStream targetConfig) {
-
-        log.debug("Setting pipeline config for {}", deviceId);
-
-        P4Info.Builder p4iInfoBuilder = P4Info.newBuilder();
-
-        try {
-            TextFormat.getParser().merge(new InputStreamReader(p4info),
-                    ExtensionRegistry.getEmptyRegistry(),
-                    p4iInfoBuilder);
-        } catch (IOException ex) {
-            log.warn("Unable to load p4info for {}: {}", deviceId, ex.getMessage());
-            return false;
-        }
-
-        P4Config.P4DeviceConfig deviceIdConfig;
-        try {
-            deviceIdConfig = P4Config.P4DeviceConfig
-                    .newBuilder()
-                    .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
-                    .setReassign(true)
-                    .setDeviceData(ByteString.readFrom(targetConfig))
-                    .build();
-        } catch (IOException ex) {
-            log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
-            return false;
-        }
-
-        SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
-                .newBuilder()
-                .setAction(VERIFY_AND_COMMIT)
-                .addConfigs(ForwardingPipelineConfig
-                        .newBuilder()
-                        .setDeviceId(p4DeviceId)
-                        .setP4Info(p4iInfoBuilder.build())
-                        .setP4DeviceConfig(deviceIdConfig.toByteString())
-                        .build())
-                .build();
-        try {
-            this.blockingStub.setForwardingPipelineConfig(request);
-        } catch (StatusRuntimeException ex) {
-            log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
-            return false;
-        }
-
-        return true;
+    public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
+        return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType));
     }
 
     @Override
-    public boolean writeTableEntries(Collection<PiTableEntry> entries, WriteOperationType opType) {
-
-        throw new UnsupportedOperationException("writeTableEntries not implemented.");
+    public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
+                                                        WriteOperationType opType, PiPipeconf pipeconf) {
+        return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf));
     }
 
     @Override
-    public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId tableId) {
-
-        throw new UnsupportedOperationException("dumpTable not implemented.");
+    public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
+        return supplyInContext(() -> doDumpTable(piTableId, pipeconf));
     }
 
     @Override
@@ -227,56 +191,224 @@
         return result;
     }
 
+    /* Blocking method implementations below */
+
+    private boolean doInitStreamChannel() {
+        // To listen for packets and other events, we need to start the RPC.
+        // Here we do it by sending a master arbitration update.
+        log.info("initializing stream chanel on {}...", deviceId);
+        if (!doArbitrationUpdate()) {
+            log.warn("Unable to initialize stream channel for {}", deviceId);
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    private boolean doArbitrationUpdate() {
+        log.info("Sending arbitration update to {}...", deviceId);
+        StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
+                .setArbitration(MasterArbitrationUpdate.newBuilder()
+                                        .setDeviceId(p4DeviceId)
+                                        .build())
+                .build();
+        try {
+            streamRequestObserver.onNext(requestMsg);
+            return true;
+        } catch (StatusRuntimeException e) {
+            log.warn("Arbitration update failed for {}: {}", deviceId, e);
+            return false;
+        }
+    }
+
+    private boolean doSetPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
+
+        log.info("Setting pipeline config for {} to {} using {}...", deviceId, pipeconf.id(), targetConfigExtType);
+
+        P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
+        if (p4Info == null) {
+            // Problem logged by PipeconfHelper.
+            return false;
+        }
+
+        if (!pipeconf.extension(targetConfigExtType).isPresent()) {
+            log.warn("Missing extension {} in pipeconf {}", targetConfigExtType, pipeconf.id());
+            return false;
+        }
+
+        InputStream targetConfig = pipeconf.extension(targetConfigExtType).get();
+        P4Config.P4DeviceConfig p4DeviceConfigMsg;
+        try {
+            p4DeviceConfigMsg = P4Config.P4DeviceConfig
+                    .newBuilder()
+                    .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
+                    .setReassign(true)
+                    .setDeviceData(ByteString.readFrom(targetConfig))
+                    .build();
+        } catch (IOException ex) {
+            log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
+            return false;
+        }
+
+        SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
+                .newBuilder()
+                .setAction(VERIFY_AND_COMMIT)
+                .addConfigs(ForwardingPipelineConfig
+                                    .newBuilder()
+                                    .setDeviceId(p4DeviceId)
+                                    .setP4Info(p4Info)
+                                    .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
+                                    .build())
+                .build();
+
+        try {
+            this.blockingStub.setForwardingPipelineConfig(request);
+
+        } catch (StatusRuntimeException ex) {
+            log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
+            return false;
+        }
+
+        return true;
+    }
+
+    private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
+                                        PiPipeconf pipeconf) {
+
+        WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
+
+        Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
+                .stream()
+                .map(tableEntryMsg ->
+                             Update.newBuilder()
+                                     .setEntity(Entity.newBuilder()
+                                                        .setTableEntry(tableEntryMsg)
+                                                        .build())
+                                     .setType(UPDATE_TYPES.get(opType))
+                                     .build())
+                .collect(Collectors.toList());
+
+        if (updateMsgs.size() == 0) {
+            return true;
+        }
+
+        writeRequestBuilder
+                .setDeviceId(p4DeviceId)
+                .setElectionId(Uint128.newBuilder()
+                                       .setHigh(0)
+                                       .setLow(ELECTION_ID)
+                                       .build())
+                .addAllUpdates(updateMsgs)
+                .build();
+
+        try {
+            blockingStub.write(writeRequestBuilder.build());
+            return true;
+        } catch (StatusRuntimeException e) {
+            log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
+            return false;
+        }
+    }
+
+    private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
+
+        log.info("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
+
+        P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+        int tableId;
+        try {
+            tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
+        } catch (P4InfoBrowser.NotFoundException e) {
+            log.warn("Unable to dump table: {}", e.getMessage());
+            return Collections.emptyList();
+        }
+
+        ReadRequest requestMsg = ReadRequest.newBuilder()
+                .setDeviceId(p4DeviceId)
+                .addEntities(Entity.newBuilder()
+                                     .setTableEntry(TableEntry.newBuilder()
+                                                            .setTableId(tableId)
+                                                            .build())
+                                     .build())
+                .build();
+
+        Iterator<ReadResponse> responses;
+        try {
+            responses = blockingStub.read(requestMsg);
+        } catch (StatusRuntimeException e) {
+            log.warn("Unable to dump table: {}", e.getMessage());
+            return Collections.emptyList();
+        }
+
+        Iterable<ReadResponse> responseIterable = () -> responses;
+        List<TableEntry> tableEntryMsgs = StreamSupport
+                .stream(responseIterable.spliterator(), false)
+                .map(ReadResponse::getEntitiesList)
+                .flatMap(List::stream)
+                .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
+                .map(Entity::getTableEntry)
+                .collect(Collectors.toList());
+
+        log.info("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
+
+        return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
+    }
+
     @Override
     public void shutdown() {
 
         log.info("Shutting down client for {}...", deviceId);
 
-        if (streamRequestObserver != null) {
-            streamRequestObserver.onCompleted();
-            streamContext.cancel(null);
-            streamContext = null;
-        }
-
-        this.executorService.shutdown();
+        writeLock.lock();
         try {
-            executorService.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            log.warn("Executor service didn't shutdown in time.");
-        }
+            if (streamRequestObserver != null) {
+                streamRequestObserver.onCompleted();
+                cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
+            }
 
-        // Prevent the execution of other tasks.
-        executorService = null;
+            this.executorService.shutdown();
+            try {
+                executorService.awaitTermination(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                log.warn("Executor service didn't shutdown in time.");
+            }
+        } finally {
+            writeLock.unlock();
+        }
     }
 
+    /**
+     * Handles messages received from the device on the stream channel.
+     */
     private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
 
         @Override
         public void onNext(StreamMessageResponse message) {
+            executorService.submit(() -> doNext(message));
+        }
 
-            P4RuntimeEvent event;
+        private void doNext(StreamMessageResponse message) {
+            log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
+            switch (message.getUpdateCase()) {
+                case PACKET:
+                    // Packet-in
+                    PacketIn packetIn = message.getPacket();
+                    ImmutableByteSequence data = copyFrom(packetIn.getPayload().asReadOnlyByteBuffer());
+                    ImmutableList.Builder<ImmutableByteSequence> metadataBuilder = ImmutableList.builder();
+                    packetIn.getMetadataList().stream()
+                            .map(m -> m.getValue().asReadOnlyByteBuffer())
+                            .map(ImmutableByteSequence::copyFrom)
+                            .forEach(metadataBuilder::add);
+                    P4RuntimeEvent event = new DefaultPacketInEvent(deviceId, data, metadataBuilder.build());
+                    controller.postEvent(event);
+                    return;
 
-            if (message.getPacket().isInitialized()) {
-                // Packet-in
-                PacketIn packetIn = message.getPacket();
-                ImmutableByteSequence data = copyFrom(packetIn.getPayload().asReadOnlyByteBuffer());
-                ImmutableList.Builder<ImmutableByteSequence> metadataBuilder = ImmutableList.builder();
-                packetIn.getMetadataList().stream()
-                        .map(m -> m.getValue().asReadOnlyByteBuffer())
-                        .map(ImmutableByteSequence::copyFrom)
-                        .forEach(metadataBuilder::add);
-                event = new DefaultPacketInEvent(deviceId, data, metadataBuilder.build());
+                case ARBITRATION:
+                    throw new UnsupportedOperationException("Arbitration not implemented.");
 
-            } else if (message.getArbitration().isInitialized()) {
-                // Arbitration.
-                throw new UnsupportedOperationException("Arbitration not implemented.");
-
-            } else {
-                log.warn("Unrecognized stream message from {}: {}", deviceId, message);
-                return;
+                default:
+                    log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
             }
-
-            controller.postEvent(event);
         }
 
         @Override
@@ -293,5 +425,4 @@
             // FIXME: same concern as before.
         }
     }
-
-}
+}
\ No newline at end of file
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 06376ca..ca78652 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -44,7 +44,6 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -56,18 +55,16 @@
         extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected GrpcController grpcController;
-
     private final Logger log = getLogger(getClass());
-
     private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
     private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
-
     // TODO: should use a cache to delete unused locks.
     private final Map<DeviceId, ReadWriteLock> deviceLocks = Maps.newConcurrentMap();
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected GrpcController grpcController;
+
     @Activate
     public void activate() {
         log.info("Started");
@@ -117,8 +114,7 @@
             return false;
         }
 
-        P4RuntimeClient client = new P4RuntimeClientImpl(deviceId, p4DeviceId, channel, this,
-                                                         newSingleThreadExecutor());
+        P4RuntimeClient client = new P4RuntimeClientImpl(deviceId, p4DeviceId, channel, this);
 
         channelIds.put(deviceId, channelId);
         clients.put(deviceId, client);
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PipeconfHelper.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PipeconfHelper.java
new file mode 100644
index 0000000..8f5c408
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PipeconfHelper.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.TextFormat;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.slf4j.Logger;
+import p4.config.P4InfoOuterClass.P4Info;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Map;
+
+import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.P4_INFO_TEXT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Utility class to deal with pipeconfs in the context of P4runtime.
+ */
+final class PipeconfHelper {
+
+    private static final Logger log = getLogger(PipeconfHelper.class);
+
+    // TODO: consider implementing this via a cache that expires unused browsers.
+    private static final Map<PiPipeconfId, P4InfoBrowser> BROWSERS = Maps.newConcurrentMap();
+    private static final Map<PiPipeconfId, P4Info> P4INFOS = Maps.newConcurrentMap();
+
+    private PipeconfHelper() {
+        // hide.
+    }
+
+    /**
+     * Extracts and returns a P4Info protobuf message from the given pipeconf. If the pipeconf does not define any
+     * extension of type {@link PiPipeconf.ExtensionType#P4_INFO_TEXT}, returns null;
+     *
+     * @param pipeconf pipeconf
+     * @return P4Info or null
+     */
+    static P4Info getP4Info(PiPipeconf pipeconf) {
+        return P4INFOS.computeIfAbsent(pipeconf.id(), piPipeconfId -> {
+            if (!pipeconf.extension(P4_INFO_TEXT).isPresent()) {
+                log.warn("Missing P4Info extension in pipeconf {}", pipeconf.id());
+                return null;
+            }
+
+            InputStream p4InfoStream = pipeconf.extension(P4_INFO_TEXT).get();
+            P4Info.Builder p4iInfoBuilder = P4Info.newBuilder();
+            try {
+                TextFormat.getParser().merge(new InputStreamReader(p4InfoStream), ExtensionRegistry.getEmptyRegistry(),
+                                             p4iInfoBuilder);
+            } catch (IOException ex) {
+                log.warn("Unable to parse P4Info of pipeconf {}: {}", pipeconf.id(), ex.getMessage());
+                return null;
+            }
+
+            return p4iInfoBuilder.build();
+        });
+    }
+
+    /**
+     * Returns a P4Info browser for the given pipeconf. If the pipeconf does not define any extension of type
+     * {@link PiPipeconf.ExtensionType#P4_INFO_TEXT}, returns null;
+     *
+     * @param pipeconf pipeconf
+     * @return P4Info browser or null
+     */
+    static P4InfoBrowser getP4InfoBrowser(PiPipeconf pipeconf) {
+        return BROWSERS.computeIfAbsent(pipeconf.id(), (pipeconfId) -> {
+            P4Info p4info = PipeconfHelper.getP4Info(pipeconf);
+            if (p4info == null) {
+                return null;
+            } else {
+                return new P4InfoBrowser(p4info);
+            }
+        });
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/TableEntryEncoder.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/TableEntryEncoder.java
new file mode 100644
index 0000000..68e1808
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/TableEntryEncoder.java
@@ -0,0 +1,407 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import org.onlab.util.ImmutableByteSequence;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.runtime.PiAction;
+import org.onosproject.net.pi.runtime.PiActionId;
+import org.onosproject.net.pi.runtime.PiActionParam;
+import org.onosproject.net.pi.runtime.PiActionParamId;
+import org.onosproject.net.pi.runtime.PiExactFieldMatch;
+import org.onosproject.net.pi.runtime.PiFieldMatch;
+import org.onosproject.net.pi.runtime.PiHeaderFieldId;
+import org.onosproject.net.pi.runtime.PiLpmFieldMatch;
+import org.onosproject.net.pi.runtime.PiRangeFieldMatch;
+import org.onosproject.net.pi.runtime.PiTableAction;
+import org.onosproject.net.pi.runtime.PiTableEntry;
+import org.onosproject.net.pi.runtime.PiTableId;
+import org.onosproject.net.pi.runtime.PiTernaryFieldMatch;
+import org.onosproject.net.pi.runtime.PiValidFieldMatch;
+import org.slf4j.Logger;
+import p4.P4RuntimeOuterClass.Action;
+import p4.P4RuntimeOuterClass.FieldMatch;
+import p4.P4RuntimeOuterClass.TableAction;
+import p4.P4RuntimeOuterClass.TableEntry;
+import p4.config.P4InfoOuterClass;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import static java.lang.String.format;
+import static org.onlab.util.ImmutableByteSequence.copyFrom;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Encoder of table entries, from ONOS Pi* format, to P4Runtime protobuf messages, and vice versa.
+ */
+final class TableEntryEncoder {
+
+
+    private static final Logger log = getLogger(TableEntryEncoder.class);
+
+    private static final String HEADER_PREFIX = "hdr.";
+    private static final String VALUE_OF_PREFIX = "value of ";
+    private static final String MASK_OF_PREFIX = "mask of ";
+    private static final String HIGH_RANGE_VALUE_OF_PREFIX = "high range value of ";
+    private static final String LOW_RANGE_VALUE_OF_PREFIX = "low range value of ";
+
+    // TODO: implement cache of encoded entities.
+
+    private TableEntryEncoder() {
+        // hide.
+    }
+
+    /**
+     * Returns a collection of P4Runtime table entry protobuf messages, encoded from the given collection of PI
+     * table entries for the given pipeconf. If a PI table entry cannot be encoded, it is skipped, hence the returned
+     * collection might have different size than the input one.
+     * <p>
+     * Please check the log for an explanation of any error that might have occurred.
+     *
+     * @param piTableEntries PI table entries
+     * @param pipeconf       PI pipeconf
+     * @return collection of P4Runtime table entry protobuf messages
+     */
+    static Collection<TableEntry> encode(Collection<PiTableEntry> piTableEntries, PiPipeconf pipeconf) {
+
+        P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+
+        if (browser == null) {
+            log.error("Unable to get a P4Info browser for pipeconf {}, skipping encoding of all table entries");
+            return Collections.emptyList();
+        }
+
+        ImmutableList.Builder<TableEntry> tableEntryMsgListBuilder = ImmutableList.builder();
+
+        for (PiTableEntry piTableEntry : piTableEntries) {
+            try {
+                tableEntryMsgListBuilder.add(encodePiTableEntry(piTableEntry, browser));
+            } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
+                log.error("Unable to encode PI table entry: {}", e.getMessage());
+            }
+        }
+
+        return tableEntryMsgListBuilder.build();
+    }
+
+    /**
+     * Returns a collection of PI table entry objects, decoded from the given collection of P4Runtime table entry
+     * messages for the given pipeconf. If a table entry message cannot be decoded, it is skipped, hence the returned
+     * collection might have different size than the input one.
+     * <p>
+     * Please check the log for an explanation of any error that might have occurred.
+     *
+     * @param tableEntryMsgs P4Runtime table entry messages
+     * @param pipeconf       PI pipeconf
+     * @return collection of PI table entry objects
+     */
+    static Collection<PiTableEntry> decode(Collection<TableEntry> tableEntryMsgs, PiPipeconf pipeconf) {
+
+        P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+
+        if (browser == null) {
+            log.error("Unable to get a P4Info browser for pipeconf {}, skipping decoding of all table entries");
+            return Collections.emptyList();
+        }
+
+        ImmutableList.Builder<PiTableEntry> piTableEntryListBuilder = ImmutableList.builder();
+
+        for (TableEntry tableEntryMsg : tableEntryMsgs) {
+            try {
+                piTableEntryListBuilder.add(decodeTableEntryMsg(tableEntryMsg, browser));
+            } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
+                log.error("Unable to decode table entry message: {}", e.getMessage());
+            }
+        }
+
+        return piTableEntryListBuilder.build();
+    }
+
+    private static TableEntry encodePiTableEntry(PiTableEntry piTableEntry, P4InfoBrowser browser)
+            throws P4InfoBrowser.NotFoundException, EncodeException {
+
+        TableEntry.Builder tableEntryMsgBuilder = TableEntry.newBuilder();
+
+        P4InfoOuterClass.Table tableInfo = browser.tables().getByName(piTableEntry.table().id());
+
+        // Table id.
+        tableEntryMsgBuilder.setTableId(tableInfo.getPreamble().getId());
+
+        // Priority.
+        // FIXME: check on P4Runtime if/what is the defaulr priority.
+        int priority = piTableEntry.priority().orElse(0);
+        tableEntryMsgBuilder.setPriority(priority);
+
+        // Controller metadata (cookie)
+        tableEntryMsgBuilder.setControllerMetadata(piTableEntry.cookie());
+
+        // Timeout.
+        if (piTableEntry.timeout().isPresent()) {
+            log.warn("Found PI table entry with timeout set, not supported in P4Runtime: {}", piTableEntry);
+        }
+
+        // Table action.
+        tableEntryMsgBuilder.setAction(encodePiTableAction(piTableEntry.action(), browser));
+
+        // Field matches.
+        for (PiFieldMatch piFieldMatch : piTableEntry.fieldMatches()) {
+            tableEntryMsgBuilder.addMatch(encodePiFieldMatch(piFieldMatch, tableInfo, browser));
+        }
+
+        return tableEntryMsgBuilder.build();
+    }
+
+    private static PiTableEntry decodeTableEntryMsg(TableEntry tableEntryMsg, P4InfoBrowser browser)
+            throws P4InfoBrowser.NotFoundException, EncodeException {
+
+        PiTableEntry.Builder piTableEntryBuilder = PiTableEntry.builder();
+
+        P4InfoOuterClass.Table tableInfo = browser.tables().getById(tableEntryMsg.getTableId());
+
+        // Table id.
+        piTableEntryBuilder.forTable(PiTableId.of(tableInfo.getPreamble().getName()));
+
+        // Priority.
+        piTableEntryBuilder.withPriority(tableEntryMsg.getPriority());
+
+        // Controller metadata (cookie)
+        piTableEntryBuilder.withCookie(tableEntryMsg.getControllerMetadata());
+
+        // Table action.
+        piTableEntryBuilder.withAction(decodeTableActionMsg(tableEntryMsg.getAction(), browser));
+
+        // Timeout.
+        // FIXME: how to decode table entry messages with timeout, given that the timeout value is lost after encoding?
+
+        // Field matches.
+        for (FieldMatch fieldMatchMsg : tableEntryMsg.getMatchList()) {
+            piTableEntryBuilder.withFieldMatch(decodeFieldMatchMsg(fieldMatchMsg, tableInfo, browser));
+        }
+
+        return piTableEntryBuilder.build();
+    }
+
+    private static FieldMatch encodePiFieldMatch(PiFieldMatch piFieldMatch, P4InfoOuterClass.Table tableInfo,
+                                                 P4InfoBrowser browser)
+            throws P4InfoBrowser.NotFoundException, EncodeException {
+
+        FieldMatch.Builder fieldMatchMsgBuilder = FieldMatch.newBuilder();
+
+        // FIXME: check how field names for stacked headers are constructed in P4Runtime.
+        String fieldName = piFieldMatch.fieldId().id();
+        int tableId = tableInfo.getPreamble().getId();
+        P4InfoOuterClass.MatchField matchFieldInfo = browser.matchFields(tableId).getByNameOrAlias(fieldName);
+        String entityName = format("field match '%s' of table '%s'",
+                                   matchFieldInfo.getName(), tableInfo.getPreamble().getName());
+        int fieldId = matchFieldInfo.getId();
+        int fieldBitwidth = matchFieldInfo.getBitwidth();
+
+        fieldMatchMsgBuilder.setFieldId(fieldId);
+
+        switch (piFieldMatch.type()) {
+            case EXACT:
+                PiExactFieldMatch fieldMatch = (PiExactFieldMatch) piFieldMatch;
+                ByteString exactValue = ByteString.copyFrom(fieldMatch.value().asReadOnlyBuffer());
+                assertSize(VALUE_OF_PREFIX + entityName, exactValue, fieldBitwidth);
+                return fieldMatchMsgBuilder.setExact(
+                        FieldMatch.Exact
+                                .newBuilder()
+                                .setValue(exactValue)
+                                .build())
+                        .build();
+            case TERNARY:
+                PiTernaryFieldMatch ternaryMatch = (PiTernaryFieldMatch) piFieldMatch;
+                ByteString ternaryValue = ByteString.copyFrom(ternaryMatch.value().asReadOnlyBuffer());
+                ByteString ternaryMask = ByteString.copyFrom(ternaryMatch.mask().asReadOnlyBuffer());
+                assertSize(VALUE_OF_PREFIX + entityName, ternaryValue, fieldBitwidth);
+                assertSize(MASK_OF_PREFIX + entityName, ternaryMask, fieldBitwidth);
+                return fieldMatchMsgBuilder.setTernary(
+                        FieldMatch.Ternary
+                                .newBuilder()
+                                .setValue(ternaryValue)
+                                .setMask(ternaryMask)
+                                .build())
+                        .build();
+            case LPM:
+                PiLpmFieldMatch lpmMatch = (PiLpmFieldMatch) piFieldMatch;
+                ByteString lpmValue = ByteString.copyFrom(lpmMatch.value().asReadOnlyBuffer());
+                int lpmPrefixLen = lpmMatch.prefixLength();
+                assertSize(VALUE_OF_PREFIX + entityName, lpmValue, fieldBitwidth);
+                assertPrefixLen(entityName, lpmPrefixLen, fieldBitwidth);
+                return fieldMatchMsgBuilder.setLpm(
+                        FieldMatch.LPM.newBuilder()
+                                .setValue(lpmValue)
+                                .setPrefixLen(lpmPrefixLen)
+                                .build())
+                        .build();
+            case RANGE:
+                PiRangeFieldMatch rangeMatch = (PiRangeFieldMatch) piFieldMatch;
+                ByteString rangeHighValue = ByteString.copyFrom(rangeMatch.highValue().asReadOnlyBuffer());
+                ByteString rangeLowValue = ByteString.copyFrom(rangeMatch.lowValue().asReadOnlyBuffer());
+                assertSize(HIGH_RANGE_VALUE_OF_PREFIX + entityName, rangeHighValue, fieldBitwidth);
+                assertSize(LOW_RANGE_VALUE_OF_PREFIX + entityName, rangeLowValue, fieldBitwidth);
+                return fieldMatchMsgBuilder.setRange(
+                        FieldMatch.Range.newBuilder()
+                                .setHigh(rangeHighValue)
+                                .setLow(rangeLowValue)
+                                .build())
+                        .build();
+            case VALID:
+                PiValidFieldMatch validMatch = (PiValidFieldMatch) piFieldMatch;
+                return fieldMatchMsgBuilder.setValid(
+                        FieldMatch.Valid.newBuilder()
+                                .setValue(validMatch.isValid())
+                                .build())
+                        .build();
+            default:
+                throw new EncodeException(format(
+                        "Building of match type %s not implemented", piFieldMatch.type()));
+        }
+    }
+
+    private static PiFieldMatch decodeFieldMatchMsg(FieldMatch fieldMatchMsg, P4InfoOuterClass.Table tableInfo,
+                                                    P4InfoBrowser browser)
+            throws P4InfoBrowser.NotFoundException, EncodeException {
+
+        int tableId = tableInfo.getPreamble().getId();
+        String fieldMatchName = browser.matchFields(tableId).getById(fieldMatchMsg.getFieldId()).getName();
+        if (fieldMatchName.startsWith(HEADER_PREFIX)) {
+            fieldMatchName = fieldMatchName.substring(HEADER_PREFIX.length());
+        }
+
+        // FIXME: Add support for decoding of stacked header names.
+        String[] pieces = fieldMatchName.split("\\.");
+        if (pieces.length != 2) {
+            throw new EncodeException(format("unrecognized field match name '%s'", fieldMatchName));
+        }
+        PiHeaderFieldId headerFieldId = PiHeaderFieldId.of(pieces[0], pieces[1]);
+
+        FieldMatch.FieldMatchTypeCase typeCase = fieldMatchMsg.getFieldMatchTypeCase();
+
+        switch (typeCase) {
+            case EXACT:
+                FieldMatch.Exact exactFieldMatch = fieldMatchMsg.getExact();
+                ImmutableByteSequence exactValue = copyFrom(exactFieldMatch.getValue().asReadOnlyByteBuffer());
+                return new PiExactFieldMatch(headerFieldId, exactValue);
+            case TERNARY:
+                FieldMatch.Ternary ternaryFieldMatch = fieldMatchMsg.getTernary();
+                ImmutableByteSequence ternaryValue = copyFrom(ternaryFieldMatch.getValue().asReadOnlyByteBuffer());
+                ImmutableByteSequence ternaryMask = copyFrom(ternaryFieldMatch.getMask().asReadOnlyByteBuffer());
+                return new PiTernaryFieldMatch(headerFieldId, ternaryValue, ternaryMask);
+            case LPM:
+                FieldMatch.LPM lpmFieldMatch = fieldMatchMsg.getLpm();
+                ImmutableByteSequence lpmValue = copyFrom(lpmFieldMatch.getValue().asReadOnlyByteBuffer());
+                int lpmPrefixLen = lpmFieldMatch.getPrefixLen();
+                return new PiLpmFieldMatch(headerFieldId, lpmValue, lpmPrefixLen);
+            case RANGE:
+                FieldMatch.Range rangeFieldMatch = fieldMatchMsg.getRange();
+                ImmutableByteSequence rangeHighValue = copyFrom(rangeFieldMatch.getHigh().asReadOnlyByteBuffer());
+                ImmutableByteSequence rangeLowValue = copyFrom(rangeFieldMatch.getLow().asReadOnlyByteBuffer());
+                return new PiRangeFieldMatch(headerFieldId, rangeLowValue, rangeHighValue);
+            case VALID:
+                FieldMatch.Valid validFieldMatch = fieldMatchMsg.getValid();
+                return new PiValidFieldMatch(headerFieldId, validFieldMatch.getValue());
+            default:
+                throw new EncodeException(format(
+                        "Decoding of field match type '%s' not implemented", typeCase.name()));
+        }
+    }
+
+    private static TableAction encodePiTableAction(PiTableAction piTableAction, P4InfoBrowser browser)
+            throws P4InfoBrowser.NotFoundException, EncodeException {
+
+        TableAction.Builder tableActionMsgBuilder = TableAction.newBuilder();
+
+        switch (piTableAction.type()) {
+            case ACTION:
+                PiAction piAction = (PiAction) piTableAction;
+                int actionId = browser.actions().getByName(piAction.id().name()).getPreamble().getId();
+
+                Action.Builder actionMsgBuilder = Action.newBuilder().setActionId(actionId);
+
+                for (PiActionParam p : piAction.parameters()) {
+                    P4InfoOuterClass.Action.Param paramInfo = browser.actionParams(actionId).getByName(p.id().name());
+                    ByteString paramValue = ByteString.copyFrom(p.value().asReadOnlyBuffer());
+                    assertSize(format("param '%s' of action '%s'", p.id(), piAction.id()),
+                               paramValue, paramInfo.getBitwidth());
+                    actionMsgBuilder.addParams(Action.Param.newBuilder()
+                                                       .setParamId(paramInfo.getId())
+                                                       .setValue(paramValue)
+                                                       .build());
+                }
+
+                tableActionMsgBuilder.setAction(actionMsgBuilder.build());
+                break;
+
+            default:
+                throw new EncodeException(
+                        format("Building of table action type %s not implemented", piTableAction.type()));
+        }
+
+        return tableActionMsgBuilder.build();
+    }
+
+    private static PiTableAction decodeTableActionMsg(TableAction tableActionMsg, P4InfoBrowser browser)
+            throws P4InfoBrowser.NotFoundException, EncodeException {
+
+        TableAction.TypeCase typeCase = tableActionMsg.getTypeCase();
+
+        switch (typeCase) {
+            case ACTION:
+                PiAction.Builder piActionBuilder = PiAction.builder();
+                Action actionMsg = tableActionMsg.getAction();
+                // Action ID.
+                int actionId = actionMsg.getActionId();
+                String actionName = browser.actions().getById(actionId).getPreamble().getName();
+                piActionBuilder.withId(PiActionId.of(actionName));
+                // Params.
+                for (Action.Param paramMsg : actionMsg.getParamsList()) {
+                    String paramName = browser.actionParams(actionId).getById(paramMsg.getParamId()).getName();
+                    ImmutableByteSequence paramValue = copyFrom(paramMsg.getValue().asReadOnlyByteBuffer());
+                    piActionBuilder.withParameter(new PiActionParam(PiActionParamId.of(paramName), paramValue));
+                }
+                return piActionBuilder.build();
+
+            default:
+                throw new EncodeException(
+                        format("Decoding of table action type %s not implemented", typeCase.name()));
+        }
+    }
+
+    private static void assertSize(String entityDescr, ByteString value, int bitWidth)
+            throws EncodeException {
+
+        int byteWidth = (int) Math.ceil((float) bitWidth / 8);
+        if (value.size() != byteWidth) {
+            throw new EncodeException(format("Wrong size for %s, expected %d bytes, but found %d",
+                                             entityDescr, byteWidth, value.size()));
+        }
+    }
+
+    private static void assertPrefixLen(String entityDescr, int prefixLength, int bitWidth)
+            throws EncodeException {
+
+        if (prefixLength > bitWidth) {
+            throw new EncodeException(format(
+                    "wrong prefix length for %s, field size is %d bits, but found one is %d",
+                    entityDescr, bitWidth, prefixLength));
+        }
+    }
+}