Implemented support for P4Runtime counter read

And PortStatisticsDiscovery behaviour for default.p4 that uses it

Change-Id: Iadf40eb322987ef74239120e01acb4bece712aef
diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiCounterCellData.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiCounterCellData.java
new file mode 100644
index 0000000..f0aada5
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiCounterCellData.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.runtime;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+
+/**
+ * Data of a counter cell of a protocol-independent pipeline.
+ */
+@Beta
+public final class PiCounterCellData {
+
+    private final PiCounterCellId cellId;
+    private final long packets;
+    private final long bytes;
+
+    /**
+     * Creates a new counter cell data for the given cell identifier, number of packets and bytes.
+     *
+     * @param cellId  counter cell identifier
+     * @param packets number of packets
+     * @param bytes   number of bytes
+     */
+    public PiCounterCellData(PiCounterCellId cellId, long packets, long bytes) {
+        this.cellId = cellId;
+        this.packets = packets;
+        this.bytes = bytes;
+    }
+
+    /**
+     * Returns the cell identifier.
+     *
+     * @return cell identifier
+     */
+    public PiCounterCellId cellId() {
+        return cellId;
+    }
+
+    /**
+     * Returns the packet count value contained by this cell.
+     *
+     * @return number of packets
+     */
+    public long packets() {
+        return packets;
+    }
+
+    /**
+     * Returns the byte count value contained by this cell.
+     *
+     * @return number of bytes
+     */
+    public long bytes() {
+        return bytes;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof PiCounterCellData)) {
+            return false;
+        }
+        PiCounterCellData that = (PiCounterCellData) o;
+        return packets == that.packets &&
+                bytes == that.bytes &&
+                Objects.equal(cellId, that.cellId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(cellId, packets, bytes);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("cellId", cellId)
+                .add("packets", packets)
+                .add("bytes", bytes)
+                .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiCounterCellId.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiCounterCellId.java
new file mode 100644
index 0000000..d762f23
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiCounterCellId.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.runtime;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Objects;
+import org.onlab.util.Identifier;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Identifier of a counter cell of a protocol-independent pipeline.
+ */
+@Beta
+public final class PiCounterCellId extends Identifier<String> {
+
+    private final PiCounterId counterId;
+    private final long index;
+
+    private PiCounterCellId(PiCounterId counterId, long index) {
+        super(counterId.id() + "[" + index + "]");
+        this.counterId = counterId;
+        this.index = index;
+    }
+
+    /**
+     * Returns a counter cell identifier for the given counter identifier and index.
+     *
+     * @param counterId counter identifier
+     * @param index     index
+     * @return counter cell identifier
+     */
+    public static PiCounterCellId of(PiCounterId counterId, long index) {
+        checkNotNull(counterId);
+        checkArgument(index >= 0, "Index must be a positive integer");
+        return new PiCounterCellId(counterId, index);
+    }
+
+    /**
+     * Returns the counter identifier of this cell.
+     *
+     * @return counter identifier
+     */
+    public PiCounterId counterId() {
+        return counterId;
+    }
+
+    /**
+     * Returns the index of this cell.
+     *
+     * @return cell index
+     */
+    public long index() {
+        return index;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof PiCounterCellId)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        PiCounterCellId that = (PiCounterCellId) o;
+        return index == that.index &&
+                Objects.equal(counterId, that.counterId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(super.hashCode(), counterId, index);
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiCounterId.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiCounterId.java
new file mode 100644
index 0000000..6fcd55e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiCounterId.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.runtime;
+
+import com.google.common.annotations.Beta;
+import org.onlab.util.Identifier;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Identifier of a counter of a protocol-independent pipeline.
+ */
+@Beta
+public final class PiCounterId extends Identifier<String> {
+
+    private PiCounterId(String name) {
+        super(name);
+    }
+
+    /**
+     * Returns a counter identifier for the given name.
+     *
+     * @param name counter name
+     * @return counter identifier
+     */
+    public static PiCounterId of(String name) {
+        checkNotNull(name);
+        checkArgument(!name.isEmpty(), "Name name can't be empty");
+        return new PiCounterId(name);
+    }
+
+    /**
+     * Returns the name of the counter.
+     *
+     * @return counter name
+     */
+    public String name() {
+        return this.identifier;
+    }
+}
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
index 3dba8b6..41fe160 100644
--- a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
@@ -19,6 +19,7 @@
 import org.onosproject.bmv2.model.Bmv2PipelineModelParser;
 import org.onosproject.driver.pipeline.DefaultSingleTablePipeline;
 import org.onosproject.drivers.p4runtime.DefaultP4Interpreter;
+import org.onosproject.drivers.p4runtime.DefaultP4PortStatisticsDiscovery;
 import org.onosproject.net.behaviour.Pipeliner;
 import org.onosproject.net.device.PortStatisticsDiscovery;
 import org.onosproject.net.pi.model.DefaultPiPipeconf;
@@ -60,7 +61,7 @@
                 .withPipelineModel(Bmv2PipelineModelParser.parse(jsonUrl))
                 .addBehaviour(PiPipelineInterpreter.class, DefaultP4Interpreter.class)
                 .addBehaviour(Pipeliner.class, DefaultSingleTablePipeline.class)
-                .addBehaviour(PortStatisticsDiscovery.class, Bmv2DefaultPortStatisticsDiscovery.class)
+                .addBehaviour(PortStatisticsDiscovery.class, DefaultP4PortStatisticsDiscovery.class)
                 .addExtension(P4_INFO_TEXT, p4InfoUrl)
                 .addExtension(BMV2_JSON, jsonUrl)
                 .build();
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPortStatisticsDiscovery.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPortStatisticsDiscovery.java
deleted file mode 100644
index 96081be..0000000
--- a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPortStatisticsDiscovery.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.drivers.bmv2;
-
-import com.google.common.collect.ImmutableList;
-import org.onosproject.net.device.PortStatistics;
-import org.onosproject.net.device.PortStatisticsDiscovery;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-
-/**
- * Implementation of the behaviour for discovering the port statistics of a Bmv2 device with the default.p4 program.
- */
-public class Bmv2DefaultPortStatisticsDiscovery extends AbstractHandlerBehaviour implements PortStatisticsDiscovery {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Override
-    public Collection<PortStatistics> discoverPortStatistics() {
-        log.debug("Discovering Port Statistics for device {}", handler().data().deviceId());
-        return ImmutableList.of();
-    }
-}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/DefaultP4PortStatisticsDiscovery.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/DefaultP4PortStatisticsDiscovery.java
new file mode 100644
index 0000000..2946304
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/DefaultP4PortStatisticsDiscovery.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onosproject.net.device.DefaultPortStatistics;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.device.PortStatisticsDiscovery;
+import org.onosproject.net.pi.runtime.PiCounterCellData;
+import org.onosproject.net.pi.runtime.PiCounterCellId;
+import org.onosproject.net.pi.runtime.PiCounterId;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of a PortStatisticsBehaviour that can be used for any P4 program based on default.p4 (i.e. those
+ * under onos/tools/test/p4src).
+ */
+public class DefaultP4PortStatisticsDiscovery extends AbstractP4RuntimeHandlerBehaviour
+        implements PortStatisticsDiscovery {
+
+    private static final PiCounterId INGRESS_COUNTER_ID = PiCounterId.of("ingress_port_counter");
+    private static final PiCounterId EGRESS_COUNTER_ID = PiCounterId.of("egress_port_counter");
+
+    @Override
+    public Collection<PortStatistics> discoverPortStatistics() {
+
+        if (!super.setupBehaviour()) {
+            return Collections.emptyList();
+        }
+
+        Map<Long, DefaultPortStatistics.Builder> portStatBuilders = Maps.newHashMap();
+
+        deviceService.getPorts(deviceId)
+                .forEach(p -> portStatBuilders.put(p.number().toLong(),
+                                                   DefaultPortStatistics.builder()
+                                                           .setPort((int) p.number().toLong())
+                                                           .setDeviceId(deviceId)));
+
+        Set<PiCounterCellId> counterCellIds = Sets.newHashSet();
+        portStatBuilders.keySet().forEach(p -> {
+            // Counter cell/index = port number.
+            counterCellIds.add(PiCounterCellId.of(INGRESS_COUNTER_ID, p));
+            counterCellIds.add(PiCounterCellId.of(EGRESS_COUNTER_ID, p));
+        });
+
+        Collection<PiCounterCellData> counterEntryResponse;
+        try {
+            counterEntryResponse = client.readCounterCells(counterCellIds, pipeconf).get();
+        } catch (InterruptedException | ExecutionException e) {
+            log.warn("Exception while reading port counters from {}: {}", deviceId, e.toString());
+            log.debug("", e);
+            return Collections.emptyList();
+        }
+
+        counterEntryResponse.forEach(counterEntry -> {
+            if (!portStatBuilders.containsKey(counterEntry.cellId().index())) {
+                log.warn("Unrecognized counter index {}, skipping", counterEntry);
+                return;
+            }
+            DefaultPortStatistics.Builder statsBuilder = portStatBuilders.get(counterEntry.cellId().index());
+            if (counterEntry.cellId().counterId().equals(INGRESS_COUNTER_ID)) {
+                statsBuilder.setPacketsReceived(counterEntry.packets());
+                statsBuilder.setBytesReceived(counterEntry.bytes());
+            } else if (counterEntry.cellId().counterId().equals(EGRESS_COUNTER_ID)) {
+                statsBuilder.setPacketsSent(counterEntry.packets());
+                statsBuilder.setBytesSent(counterEntry.bytes());
+            } else {
+                log.warn("Unrecognized counter ID {}, skipping", counterEntry);
+            }
+        });
+
+        return portStatBuilders
+                .values()
+                .stream()
+                .map(DefaultPortStatistics.Builder::build)
+                .collect(Collectors.toList());
+    }
+}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
index 5405362..0bd19e5 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
@@ -18,11 +18,15 @@
 
 import com.google.common.annotations.Beta;
 import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.runtime.PiCounterCellData;
+import org.onosproject.net.pi.runtime.PiCounterCellId;
+import org.onosproject.net.pi.runtime.PiCounterId;
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 import org.onosproject.net.pi.runtime.PiTableEntry;
 import org.onosproject.net.pi.runtime.PiTableId;
 
 import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -72,7 +76,7 @@
                                                  PiPipeconf pipeconf);
 
     /**
-     * Dumps all entries currently installed in the given table.
+     * Dumps all entries currently installed in the given table, for the given pipeconf.
      *
      * @param tableId  table identifier
      * @param pipeconf pipeconf currently deployed on the device
@@ -81,7 +85,7 @@
     CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId tableId, PiPipeconf pipeconf);
 
     /**
-     * Executes a packet-out operation.
+     * Executes a packet-out operation for the given pipeconf.
      *
      * @param packet   packet-out operation to be performed by the device
      * @param pipeconf pipeconf currently deployed on the device
@@ -90,6 +94,27 @@
     CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf);
 
     /**
+     * Returns the value of all counter cells for the given set of counter identifiers and pipeconf.
+     *
+     * @param counterIds counter identifiers
+     * @param pipeconf   pipeconf
+     * @return collection of counter data
+     */
+    CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
+                                                                         PiPipeconf pipeconf);
+
+    /**
+     * Returns a collection of counter data corresponding to the given set of counter cell identifiers, for the given
+     * pipeconf.
+     *
+     * @param cellIds set of counter cell identifiers
+     * @param pipeconf   pipeconf
+     * @return collection of counter data
+     */
+    CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
+                                                                      PiPipeconf pipeconf);
+
+    /**
      * Shutdown the client by terminating any active RPC such as the stream channel.
      */
     void shutdown();
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 26ec75a..00a2683 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -16,7 +16,9 @@
 
 package org.onosproject.p4runtime.ctl;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import io.grpc.Context;
 import io.grpc.ManagedChannel;
@@ -26,6 +28,9 @@
 import org.onlab.osgi.DefaultServiceDirectory;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.runtime.PiCounterCellData;
+import org.onosproject.net.pi.runtime.PiCounterCellId;
+import org.onosproject.net.pi.runtime.PiCounterId;
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 import org.onosproject.net.pi.runtime.PiPipeconfService;
 import org.onosproject.net.pi.runtime.PiTableEntry;
@@ -34,6 +39,7 @@
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.slf4j.Logger;
 import p4.P4RuntimeGrpc;
+import p4.P4RuntimeOuterClass.CounterEntry;
 import p4.P4RuntimeOuterClass.Entity;
 import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
 import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
@@ -56,6 +62,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -70,6 +77,7 @@
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType;
 import static org.slf4j.LoggerFactory.getLogger;
+import static p4.P4RuntimeOuterClass.Entity.EntityCase.COUNTER_ENTRY;
 import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
 import static p4.P4RuntimeOuterClass.PacketOut;
 import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
@@ -169,6 +177,25 @@
         return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
     }
 
+    @Override
+    public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
+                                                                             PiPipeconf pipeconf) {
+        return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
+                               "readCounterCells-" + cellIds.hashCode());
+    }
+
+    @Override
+    public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
+                                                                                PiPipeconf pipeconf) {
+        Set<PiCounterCellId> cellIds = counterIds.stream()
+                // Cell with index 0 means all cells.
+                .map(counterId -> PiCounterCellId.of(counterId, 0))
+                .collect(Collectors.toSet());
+
+        return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
+                               "readAllCounterCells-" + cellIds.hashCode());
+    }
+
     /* Blocking method implementations below */
 
     private boolean doInitStreamChannel() {
@@ -387,6 +414,66 @@
         log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
     }
 
+    private Collection<PiCounterCellData> doReadCounterCells(Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
+
+        // From p4runtime.proto:
+        // For ReadRequest, the scope is defined as follows:
+        // - All counter cells for all meters if counter_id = 0 (default).
+        // - All counter cells for given counter_id if index = 0 (default).
+
+        final ReadRequest.Builder requestBuilder = ReadRequest.newBuilder().setDeviceId(p4DeviceId);
+        final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+        final Map<Integer, PiCounterId> counterIdMap = Maps.newHashMap();
+
+        for (PiCounterCellId cellId : cellIds) {
+            int counterId;
+            try {
+                counterId = browser.counters().getByNameOrAlias(cellId.counterId().id()).getPreamble().getId();
+            } catch (P4InfoBrowser.NotFoundException e) {
+                log.warn("Skipping counter cell {}: {}", cellId, e.getMessage());
+                continue;
+            }
+            requestBuilder
+                    .addEntities(Entity.newBuilder()
+                                         .setCounterEntry(CounterEntry.newBuilder()
+                                                                  .setCounterId(counterId)
+                                                                  .setIndex(cellId.index())
+                                                                  .build()));
+            counterIdMap.put(counterId, cellId.counterId());
+        }
+
+        final Iterator<ReadResponse> responses;
+        try {
+            responses = blockingStub.read(requestBuilder.build());
+        } catch (StatusRuntimeException e) {
+            log.warn("Unable to read counters: {}", e.getMessage());
+            return Collections.emptyList();
+        }
+
+        final Iterable<ReadResponse> responseIterable = () -> responses;
+        final ImmutableList.Builder<PiCounterCellData> piCounterEntryListBuilder = ImmutableList.builder();
+
+        StreamSupport
+                .stream(responseIterable.spliterator(), false)
+                .map(ReadResponse::getEntitiesList)
+                .flatMap(List::stream)
+                .filter(entity -> entity.getEntityCase() == COUNTER_ENTRY)
+                .map(Entity::getCounterEntry)
+                .forEach(counterEntryMsg -> {
+                    if (!counterIdMap.containsKey(counterEntryMsg.getCounterId())) {
+                        log.warn("Unrecognized counter ID '{}', skipping", counterEntryMsg.getCounterId());
+                        return;
+                    }
+                    PiCounterCellId cellId = PiCounterCellId.of(counterIdMap.get(counterEntryMsg.getCounterId()),
+                                                                counterEntryMsg.getIndex());
+                    piCounterEntryListBuilder.add(new PiCounterCellData(cellId,
+                                                                        counterEntryMsg.getData().getPacketCount(),
+                                                                        counterEntryMsg.getData().getByteCount()));
+                });
+
+        return piCounterEntryListBuilder.build();
+    }
+
     /**
      * Returns the internal P4 device ID associated with this client.
      *