New P4RuntimeClient implementation that supports batching and error reporting

The new client API supports batching and provides detailed response for
write requests (e.g. if entity already exists when inserting), which was
not possible with the old one.

This patch includes:
- New more efficient implementation of P4RuntimeClient (no more locking,
use native gRPC executor, use stub deadlines)
- Ported all codecs to new AbstractCodec-based implementation (needed to
implement codec cache in the future)
- Uses batching in P4RuntimeFlowRuleProgrammable and
P4RuntimeGroupActionProgrammable
- Minor changes to PI framework runtime classes

Change-Id: I3fac42057bb4e1389d761006a32600c786598683
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
index 41c2b68..8a9a403 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
@@ -65,7 +65,7 @@
 
     /**
      * Initializes this behaviour attributes. Returns true if the operation was
-     * successful, false otherwise. This method assumes that the P4runtime
+     * successful, false otherwise. This method assumes that the P4Runtime
      * controller already has a client for this device and that the device has
      * been created in the core.
      *
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
index c7d76c0..664bd59 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
@@ -101,7 +101,7 @@
             return false;
         }
 
-        return client.isPipelineConfigSet(pipeconf, deviceDataBuffer);
+        return client.isPipelineConfigSetSync(pipeconf, deviceDataBuffer);
     }
 
     @Override
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
index d614d6e..fca05cb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
@@ -16,14 +16,12 @@
 
 package org.onosproject.drivers.p4runtime;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Striped;
 import org.onlab.util.SharedExecutors;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileGroupMirror;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileMemberMirror;
+import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMirror;
 import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.group.DefaultGroup;
@@ -34,17 +32,18 @@
 import org.onosproject.net.group.GroupOperations;
 import org.onosproject.net.group.GroupProgrammable;
 import org.onosproject.net.group.GroupStore;
-import org.onosproject.net.pi.model.PiActionProfileId;
 import org.onosproject.net.pi.model.PiActionProfileModel;
 import org.onosproject.net.pi.runtime.PiActionProfileGroup;
 import org.onosproject.net.pi.runtime.PiActionProfileGroupHandle;
 import org.onosproject.net.pi.runtime.PiActionProfileMember;
 import org.onosproject.net.pi.runtime.PiActionProfileMemberHandle;
-import org.onosproject.net.pi.runtime.PiActionProfileMemberId;
+import org.onosproject.net.pi.runtime.PiEntity;
+import org.onosproject.net.pi.runtime.PiHandle;
 import org.onosproject.net.pi.service.PiGroupTranslator;
 import org.onosproject.net.pi.service.PiTranslatedEntity;
 import org.onosproject.net.pi.service.PiTranslationException;
-import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeReadClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -53,14 +52,10 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.singletonList;
 import static java.util.stream.Collectors.toMap;
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.DELETE;
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.INSERT;
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.MODIFY;
+import static java.util.stream.Collectors.toSet;
 
 /**
  * Implementation of GroupProgrammable to handle action profile groups in
@@ -80,9 +75,6 @@
     private P4RuntimeActionProfileMemberMirror memberMirror;
     private PiGroupTranslator groupTranslator;
 
-    // Needed to synchronize operations over the same group.
-    private static final Striped<Lock> STRIPED_LOCKS = Striped.lock(30);
-
     @Override
     protected boolean setupBehaviour() {
         if (!super.setupBehaviour()) {
@@ -134,25 +126,33 @@
         }
 
         // Dump groups and members from device for all action profiles.
-        final Set<PiActionProfileId> actionProfileIds = pipeconf.pipelineModel()
-                .actionProfiles()
-                .stream()
-                .map(PiActionProfileModel::id)
-                .collect(Collectors.toSet());
-        final Map<PiActionProfileGroupHandle, PiActionProfileGroup>
-                groupsOnDevice = dumpAllGroupsFromDevice(actionProfileIds);
+        final P4RuntimeReadClient.ReadRequest request = client.read(pipeconf);
+        pipeconf.pipelineModel().actionProfiles()
+                .stream().map(PiActionProfileModel::id)
+                .forEach(id -> request.actionProfileGroups(id)
+                        .actionProfileMembers(id));
+        final P4RuntimeReadClient.ReadResponse response = request.submitSync();
+
+        if (!response.isSuccess()) {
+            // Error at client level.
+            return Collections.emptyList();
+        }
+
+        final Collection<PiActionProfileGroup> groupsOnDevice = response.all(
+                PiActionProfileGroup.class);
         final Map<PiActionProfileMemberHandle, PiActionProfileMember> membersOnDevice =
-                dumpAllMembersFromDevice(actionProfileIds);
+                response.all(PiActionProfileMember.class).stream()
+                        .collect(toMap(m -> m.handle(deviceId), m -> m));
 
         // Sync mirrors.
         groupMirror.sync(deviceId, groupsOnDevice);
-        memberMirror.sync(deviceId, membersOnDevice);
+        memberMirror.sync(deviceId, membersOnDevice.values());
 
         // Retrieve the original PD group before translation.
         final List<Group> result = Lists.newArrayList();
         final List<PiActionProfileGroup> groupsToRemove = Lists.newArrayList();
         final Set<PiActionProfileMemberHandle> memberHandlesToKeep = Sets.newHashSet();
-        for (PiActionProfileGroup piGroup : groupsOnDevice.values()) {
+        for (PiActionProfileGroup piGroup : groupsOnDevice) {
             final Group pdGroup = checkAndForgeGroupEntry(piGroup, membersOnDevice);
             if (pdGroup == null) {
                 // Entry is on device but unknown to translation service or
@@ -168,13 +168,24 @@
             }
         }
 
-        // Trigger clean up of inconsistent groups and members. This will update
-        // the mirror accordingly.
+        // Trigger clean up of inconsistent groups and members (if any). This
+        // process takes care of removing any orphan member, e.g. from a
+        // partial/unsuccessful group insertion.
+        // This will update the mirror accordingly.
         final Set<PiActionProfileMemberHandle> memberHandlesToRemove = Sets.difference(
                 membersOnDevice.keySet(), memberHandlesToKeep);
-        SharedExecutors.getSingleThreadExecutor().execute(
-                () -> cleanUpInconsistentGroupsAndMembers(
-                        groupsToRemove, memberHandlesToRemove));
+        final Set<PiActionProfileGroupHandle> groupHandlesToRemove = groupsToRemove
+                .stream().map(g -> g.handle(deviceId)).collect(toSet());
+        if (groupHandlesToRemove.size() + memberHandlesToRemove.size() > 0) {
+            log.warn("Cleaning up {} action profile groups and " +
+                             "{} members on {}...",
+                     groupHandlesToRemove.size(), memberHandlesToRemove.size(), deviceId);
+            SharedExecutors.getSingleThreadExecutor().execute(
+                    () -> submitWriteRequestAndUpdateMirror(
+                            client.write(pipeconf)
+                                    .delete(groupHandlesToRemove)
+                                    .delete(memberHandlesToRemove)));
+        }
 
         // Done.
         return result;
@@ -182,7 +193,9 @@
 
     private Collection<Group> getGroupsFromMirror() {
         final Map<PiActionProfileMemberHandle, PiActionProfileMember> members =
-                memberMirror.deviceHandleMap(deviceId);
+                memberMirror.getAll(deviceId).stream()
+                .map(TimedEntry::entry)
+                .collect(toMap(e -> e.handle(deviceId), e -> e));
         return groupMirror.getAll(deviceId).stream()
                 .map(TimedEntry::entry)
                 .map(g -> checkAndForgeGroupEntry(
@@ -191,60 +204,6 @@
                 .collect(Collectors.toList());
     }
 
-    private void cleanUpInconsistentGroupsAndMembers(Collection<PiActionProfileGroup> groupsToRemove,
-                                                     Collection<PiActionProfileMemberHandle> membersToRemove) {
-        if (!groupsToRemove.isEmpty()) {
-            log.warn("Found {} inconsistent action profile groups on {}, removing them...",
-                     groupsToRemove.size(), deviceId);
-            groupsToRemove.forEach(piGroup -> {
-                log.debug(piGroup.toString());
-                processPiGroup(piGroup, null, Operation.REMOVE);
-            });
-        }
-        if (!membersToRemove.isEmpty()) {
-            log.warn("Found {} inconsistent action profile members on {}, removing them...",
-                     membersToRemove.size(), deviceId);
-            // FIXME: implement client call to remove members from multiple
-            // action profiles in one shot.
-            final ListMultimap<PiActionProfileId, PiActionProfileMemberId>
-                    membersByActProfId = ArrayListMultimap.create();
-            membersToRemove.forEach(m -> membersByActProfId.put(
-                    m.actionProfileId(), m.memberId()));
-            membersByActProfId.keySet().forEach(actProfId -> {
-                List<PiActionProfileMemberId> removedMembers = getFutureWithDeadline(
-                        client.removeActionProfileMembers(
-                                actProfId, membersByActProfId.get(actProfId), pipeconf),
-                        "cleaning up action profile members", Collections.emptyList());
-                // Update member mirror.
-                removedMembers.stream()
-                        .map(id -> PiActionProfileMemberHandle.of(deviceId, actProfId, id))
-                        .forEach(memberMirror::remove);
-            });
-        }
-    }
-
-    private Map<PiActionProfileGroupHandle, PiActionProfileGroup> dumpAllGroupsFromDevice(
-            Set<PiActionProfileId> actProfIds) {
-        // TODO: implement P4Runtime client call to read all groups with one call
-        // Good if pipeline has multiple action profiles.
-        return actProfIds.stream()
-                .flatMap(actProfId -> getFutureWithDeadline(
-                        client.dumpActionProfileGroups(actProfId, pipeconf),
-                        "dumping groups", Collections.emptyList()).stream())
-                .collect(toMap(g -> PiActionProfileGroupHandle.of(deviceId, g), g -> g));
-    }
-
-    private Map<PiActionProfileMemberHandle, PiActionProfileMember> dumpAllMembersFromDevice(
-            Set<PiActionProfileId> actProfIds) {
-        // TODO: implement P4Runtime client call to read all members with one call
-        // Good if pipeline has multiple action profiles.
-        return actProfIds.stream()
-                .flatMap(actProfId -> getFutureWithDeadline(
-                        client.dumpActionProfileMembers(actProfId, pipeconf),
-                        "dumping members", Collections.emptyList()).stream())
-                .collect(toMap(m -> PiActionProfileMemberHandle.of(deviceId, m), m -> m));
-    }
-
     private Group checkAndForgeGroupEntry(
             PiActionProfileGroup piGroupOnDevice,
             Map<PiActionProfileMemberHandle, PiActionProfileMember> membersOnDevice) {
@@ -269,7 +228,7 @@
         // Groups in P4Runtime contains only a reference to members. Check that
         // the actual member instances in the translation store are the same
         // found on the device.
-        if (!validateMembers(piGroupFromStore, membersOnDevice)) {
+        if (!validateGroupMembers(piGroupFromStore, membersOnDevice)) {
             log.warn("Group on device {} refers to members that are different " +
                              "than those found in translation store: {}", handle);
             return null;
@@ -282,7 +241,7 @@
         return addedGroup(translatedEntity.get().original(), mirrorEntry.lifeSec());
     }
 
-    private boolean validateMembers(
+    private boolean validateGroupMembers(
             PiActionProfileGroup piGroupFromStore,
             Map<PiActionProfileMemberHandle, PiActionProfileMember> membersOnDevice) {
         final Collection<PiActionProfileMember> groupMembers =
@@ -291,9 +250,8 @@
             return false;
         }
         return groupMembers.stream().allMatch(
-                memberFromStore -> memberFromStore.equals(
-                        membersOnDevice.get(
-                                PiActionProfileMemberHandle.of(deviceId, memberFromStore))));
+                memberFromStore -> memberFromStore.equals(membersOnDevice.get(
+                        memberFromStore.handle(deviceId))));
     }
 
     private Group addedGroup(Group original, long life) {
@@ -314,143 +272,79 @@
         }
         final Operation operation = opType.equals(GroupOperation.Type.DELETE)
                 ? Operation.REMOVE : Operation.APPLY;
-        processPiGroup(piGroup, pdGroup, operation);
-    }
-
-    private void processPiGroup(PiActionProfileGroup groupToApply,
-                                Group pdGroup,
-                                Operation operation) {
-        final PiActionProfileGroupHandle handle = PiActionProfileGroupHandle.of(deviceId, groupToApply);
-        STRIPED_LOCKS.get(handle).lock();
-        try {
-            switch (operation) {
-                case APPLY:
-                    if (applyGroupWithMembersOrNothing(groupToApply, handle)) {
-                        groupTranslator.learn(handle, new PiTranslatedEntity<>(
-                                pdGroup, groupToApply, handle));
-                    }
-                    return;
-                case REMOVE:
-                    if (deleteGroup(groupToApply, handle)) {
-                        groupTranslator.forget(handle);
-                    }
-                    return;
-                default:
-                    log.error("Unknwon group operation type {}, cannot process group", operation);
-                    break;
+        final PiActionProfileGroupHandle handle = piGroup.handle(deviceId);
+        if (writePiGroupOnDevice(piGroup, handle, operation)) {
+            if (operation.equals(Operation.APPLY)) {
+                groupTranslator.learn(handle, new PiTranslatedEntity<>(
+                        pdGroup, piGroup, handle));
+            } else {
+                groupTranslator.forget(handle);
             }
-        } finally {
-            STRIPED_LOCKS.get(handle).unlock();
         }
     }
 
-    private boolean applyGroupWithMembersOrNothing(PiActionProfileGroup group, PiActionProfileGroupHandle handle) {
-        // First apply members, then group, if fails, delete members.
-        Collection<PiActionProfileMember> members = extractAllMemberInstancesOrNull(group);
+    private boolean writePiGroupOnDevice(
+            PiActionProfileGroup group,
+            PiActionProfileGroupHandle groupHandle,
+            Operation operation) {
+        // Generate a write request to write both members and groups. Return
+        // true if request is successful or if there's no need to write on
+        // device (according to mirror state), otherwise, return false.
+        final Collection<PiActionProfileMember> members = extractAllMemberInstancesOrNull(group);
         if (members == null) {
             return false;
         }
-        if (!applyAllMembersOrNothing(members)) {
-            return false;
-        }
-        if (!applyGroup(group, handle)) {
-            deleteMembers(handles(members));
-            return false;
-        }
-        return true;
-    }
-
-    private boolean applyGroup(PiActionProfileGroup groupToApply, PiActionProfileGroupHandle handle) {
-        final TimedEntry<PiActionProfileGroup> groupOnDevice = groupMirror.get(handle);
-        final P4RuntimeClient.WriteOperationType opType =
-                groupOnDevice == null ? INSERT : MODIFY;
-        if (opType.equals(MODIFY) && groupToApply.equals(groupOnDevice.entry())) {
-            // Skip writing, group is unchanged.
-            return true;
-        }
-        final boolean success = getFutureWithDeadline(
-                client.writeActionProfileGroup(groupToApply, opType, pipeconf),
-                "performing action profile group " + opType, false);
-        if (success) {
-            groupMirror.put(handle, groupToApply);
-        }
-        return success;
-    }
-
-    private boolean deleteGroup(PiActionProfileGroup group, PiActionProfileGroupHandle handle) {
-        final boolean success = getFutureWithDeadline(
-                client.writeActionProfileGroup(group, DELETE, pipeconf),
-                "performing action profile group " + DELETE, false);
-        if (success) {
-            groupMirror.remove(handle);
-        }
-        // Orphan members will be removed at the next reconciliation cycle.
-        return success;
-    }
-
-    private boolean applyAllMembersOrNothing(Collection<PiActionProfileMember> members) {
-        Collection<PiActionProfileMember> appliedMembers = applyMembers(members);
-        if (appliedMembers.size() == members.size()) {
+        final P4RuntimeWriteClient.WriteRequest request = client.write(pipeconf);
+        // FIXME: when operation is remove, should we remove members first? Same
+        //  thing when modifying a group, should we first modify the group then
+        //  remove the member?
+        final boolean allMembersSkipped = members.stream()
+                .allMatch(m -> appendEntityToWriteRequestOrSkip(
+                        request, m.handle(deviceId), m, memberMirror, operation));
+        final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
+                request, groupHandle, group, groupMirror, operation);
+        if (allMembersSkipped && groupSkipped) {
             return true;
         } else {
-            deleteMembers(handles(appliedMembers));
-            return false;
+            // True if all entities in the request (groups and members) where
+            // written successfully.
+            return submitWriteRequestAndUpdateMirror(request).isSuccess();
         }
     }
 
-    private Collection<PiActionProfileMember> applyMembers(
-            Collection<PiActionProfileMember> members) {
-        return members.stream()
-                .filter(this::applyMember)
-                .collect(Collectors.toList());
-    }
-
-    private boolean applyMember(PiActionProfileMember memberToApply) {
-        // If exists, modify, otherwise insert.
-        final PiActionProfileMemberHandle handle = PiActionProfileMemberHandle.of(
-                deviceId, memberToApply);
-        final TimedEntry<PiActionProfileMember> memberOnDevice = memberMirror.get(handle);
-        final P4RuntimeClient.WriteOperationType opType =
-                memberOnDevice == null ? INSERT : MODIFY;
-        if (opType.equals(MODIFY) && memberToApply.equals(memberOnDevice.entry())) {
-            // Skip writing if member is unchanged.
-            return true;
+    private <H extends PiHandle, E extends PiEntity> boolean appendEntityToWriteRequestOrSkip(
+            P4RuntimeWriteClient.WriteRequest writeRequest,
+            H handle,
+            E entityToApply,
+            P4RuntimeMirror<H, E> mirror,
+            Operation operation) {
+        // Should return true if there's no need to write entity on device,
+        // false if the write request is modified or an error occurs.
+        final TimedEntry<E> entityOnDevice = mirror.get(handle);
+        switch (operation) {
+            case APPLY:
+                if (entityOnDevice == null) {
+                    writeRequest.insert(entityToApply);
+                } else if (entityToApply.equals(entityOnDevice.entry())) {
+                    // Skip writing if group is unchanged.
+                    return true;
+                } else {
+                    writeRequest.modify(entityToApply);
+                }
+                break;
+            case REMOVE:
+                if (entityOnDevice == null) {
+                    // Skip deleting if group does not exist on device.
+                    return true;
+                } else {
+                    writeRequest.delete(handle);
+                }
+                break;
+            default:
+                log.error("Unrecognized operation {}", operation);
+                break;
         }
-        final boolean success = getFutureWithDeadline(
-                client.writeActionProfileMembers(
-                        singletonList(memberToApply), opType, pipeconf),
-                "performing action profile member " + opType, false);
-        if (success) {
-            memberMirror.put(handle, memberToApply);
-        }
-        return success;
-    }
-
-    private void deleteMembers(Collection<PiActionProfileMemberHandle> handles) {
-        // TODO: improve by batching deletes.
-        handles.forEach(this::deleteMember);
-    }
-
-    private void deleteMember(PiActionProfileMemberHandle handle) {
-        final boolean success = getFutureWithDeadline(
-                client.removeActionProfileMembers(
-                        handle.actionProfileId(),
-                        singletonList(handle.memberId()), pipeconf),
-                "performing action profile member " + DELETE,
-                Collections.emptyList())
-                // Successful if the only member passed has been removed.
-                .size() == 1;
-        if (success) {
-            memberMirror.remove(handle);
-        }
-    }
-
-    private Collection<PiActionProfileMemberHandle> handles(
-            Collection<PiActionProfileMember> members) {
-        return members.stream()
-                .map(m -> PiActionProfileMemberHandle.of(
-                        deviceId, m.actionProfile(), m.id()))
-                .collect(Collectors.toList());
+        return false;
     }
 
     private Collection<PiActionProfileMember> extractAllMemberInstancesOrNull(
@@ -468,6 +362,14 @@
         return instances;
     }
 
+    private P4RuntimeWriteClient.WriteResponse submitWriteRequestAndUpdateMirror(
+            P4RuntimeWriteClient.WriteRequest request) {
+        final P4RuntimeWriteClient.WriteResponse response = request.submitSync();
+        groupMirror.replayWriteResponse(response);
+        memberMirror.replayWriteResponse(response);
+        return response;
+    }
+
     enum Operation {
         APPLY, REMOVE
     }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index 693353f..f905c4c 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -19,7 +19,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Striped;
 import org.onlab.util.SharedExecutors;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
 import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
@@ -27,19 +26,24 @@
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleProgrammable;
+import org.onosproject.net.pi.model.PiCounterType;
 import org.onosproject.net.pi.model.PiPipelineInterpreter;
 import org.onosproject.net.pi.model.PiPipelineModel;
 import org.onosproject.net.pi.model.PiTableId;
-import org.onosproject.net.pi.model.PiTableModel;
 import org.onosproject.net.pi.runtime.PiCounterCell;
 import org.onosproject.net.pi.runtime.PiCounterCellData;
+import org.onosproject.net.pi.runtime.PiCounterCellHandle;
 import org.onosproject.net.pi.runtime.PiCounterCellId;
+import org.onosproject.net.pi.runtime.PiEntityType;
+import org.onosproject.net.pi.runtime.PiHandle;
 import org.onosproject.net.pi.runtime.PiTableEntry;
 import org.onosproject.net.pi.runtime.PiTableEntryHandle;
 import org.onosproject.net.pi.service.PiFlowRuleTranslator;
 import org.onosproject.net.pi.service.PiTranslatedEntity;
 import org.onosproject.net.pi.service.PiTranslationException;
-import org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType;
+import org.onosproject.p4runtime.api.P4RuntimeReadClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -48,18 +52,14 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
-import static com.google.common.collect.Lists.newArrayList;
 import static org.onosproject.drivers.p4runtime.P4RuntimeFlowRuleProgrammable.Operation.APPLY;
 import static org.onosproject.drivers.p4runtime.P4RuntimeFlowRuleProgrammable.Operation.REMOVE;
 import static org.onosproject.net.flow.FlowEntry.FlowEntryState.ADDED;
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.DELETE;
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.INSERT;
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.MODIFY;
+import static org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType.DELETE;
+import static org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType.INSERT;
+import static org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType.MODIFY;
 
 /**
  * Implementation of the flow rule programmable behaviour for P4Runtime.
@@ -75,11 +75,6 @@
     private static final String DELETE_BEFORE_UPDATE = "tableDeleteBeforeUpdate";
     private static final boolean DEFAULT_DELETE_BEFORE_UPDATE = false;
 
-    // If true, we ignore re-installing rules that already exist in the
-    // device mirror, i.e. same match key and action.
-    private static final String IGNORE_SAME_ENTRY_UPDATE = "tableIgnoreSameEntryUpdate";
-    private static final boolean DEFAULT_IGNORE_SAME_ENTRY_UPDATE = false;
-
     // If true, we avoid querying the device and return what's already known by
     // the ONOS store.
     private static final String READ_FROM_MIRROR = "tableReadFromMirror";
@@ -102,9 +97,6 @@
     private static final String TABLE_DEFAULT_AS_ENTRY = "tableDefaultAsEntry";
     private static final boolean DEFAULT_TABLE_DEFAULT_AS_ENTRY = false;
 
-    // Needed to synchronize operations over the same table entry.
-    private static final Striped<Lock> ENTRY_LOCKS = Striped.lock(30);
-
     private PiPipelineModel pipelineModel;
     private P4RuntimeTableMirror tableMirror;
     private PiFlowRuleTranslator translator;
@@ -136,24 +128,21 @@
         final ImmutableList.Builder<FlowEntry> result = ImmutableList.builder();
         final List<PiTableEntry> inconsistentEntries = Lists.newArrayList();
 
-        // Read table entries, including default ones.
-        final Collection<PiTableEntry> deviceEntries = Stream.concat(
-                streamEntries(), streamDefaultEntries())
-                // Ignore entries from constant tables.
-                .filter(e -> !tableIsConstant(e.table()))
-                // Device implementation might return duplicate entries. For
-                // example if reading only default ones is not supported and
-                // non-default entries are returned, by using distinct() we are
-                // robust against that possibility.
-                .distinct()
-                .collect(Collectors.toList());
-
-        if (deviceEntries.isEmpty()) {
+        // Read table entries from device.
+        final Collection<PiTableEntry> deviceEntries = getAllTableEntriesFromDevice();
+        if (deviceEntries == null) {
+            // Potential error at the client level.
             return Collections.emptyList();
         }
 
         // Synchronize mirror with the device state.
-        syncMirror(deviceEntries);
+        tableMirror.sync(deviceId, deviceEntries);
+
+        if (deviceEntries.isEmpty()) {
+            // Nothing to do.
+            return Collections.emptyList();
+        }
+
         final Map<PiTableEntry, PiCounterCellData> counterCellMap =
                 readEntryCounters(deviceEntries);
         // Forge flow entries with counter values.
@@ -174,7 +163,7 @@
             }
         }
 
-        if (inconsistentEntries.size() > 0) {
+        if (!inconsistentEntries.isEmpty()) {
             // Trigger clean up of inconsistent entries.
             SharedExecutors.getSingleThreadExecutor().execute(
                     () -> cleanUpInconsistentEntries(inconsistentEntries));
@@ -183,32 +172,28 @@
         return result.build();
     }
 
-    private Stream<PiTableEntry> streamEntries() {
-        return getFutureWithDeadline(
-                client.dumpAllTables(pipeconf), "dumping all tables",
-                Collections.emptyList())
-                .stream();
-    }
-
-    private Stream<PiTableEntry> streamDefaultEntries() {
-        // Ignore tables with constant default action.
-        final Set<PiTableId> defaultTables = pipelineModel.tables()
-                .stream()
-                .filter(table -> !table.constDefaultAction().isPresent())
-                .map(PiTableModel::id)
-                .collect(Collectors.toSet());
-        return defaultTables.isEmpty() ? Stream.empty()
-                : getFutureWithDeadline(
-                client.dumpTables(defaultTables, true, pipeconf),
-                "dumping default table entries",
-                Collections.emptyList())
-                .stream();
-    }
-
-    private void syncMirror(Collection<PiTableEntry> entries) {
-        Map<PiTableEntryHandle, PiTableEntry> handleMap = Maps.newHashMap();
-        entries.forEach(e -> handleMap.put(PiTableEntryHandle.of(deviceId, e), e));
-        tableMirror.sync(deviceId, handleMap);
+    private Collection<PiTableEntry> getAllTableEntriesFromDevice() {
+        final P4RuntimeReadClient.ReadRequest request = client.read(pipeconf);
+        // Read entries from all non-constant tables, including default ones.
+        pipelineModel.tables().stream()
+                .filter(t -> !t.isConstantTable())
+                .forEach(t -> {
+                    request.tableEntries(t.id());
+                    if (!t.constDefaultAction().isPresent()) {
+                        request.defaultTableEntry(t.id());
+                    }
+                });
+        final P4RuntimeReadClient.ReadResponse response = request.submitSync();
+        if (!response.isSuccess()) {
+            return null;
+        }
+        return response.all(PiTableEntry.class).stream()
+                // Device implementation might return duplicate entries. For
+                // example if reading only default ones is not supported and
+                // non-default entries are returned, by using distinct() we
+                // are robust against that possibility.
+                .distinct()
+                .collect(Collectors.toList());
     }
 
     @Override
@@ -223,8 +208,7 @@
 
     private FlowEntry forgeFlowEntry(PiTableEntry entry,
                                      PiCounterCellData cellData) {
-        final PiTableEntryHandle handle = PiTableEntryHandle
-                .of(deviceId, entry);
+        final PiTableEntryHandle handle = entry.handle(deviceId);
         final Optional<PiTranslatedEntity<FlowRule, PiTableEntry>>
                 translatedEntity = translator.lookup(handle);
         final TimedEntry<PiTableEntry> timedEntry = tableMirror.get(handle);
@@ -265,105 +249,157 @@
     private void cleanUpInconsistentEntries(Collection<PiTableEntry> piEntries) {
         log.warn("Found {} inconsistent table entries on {}, removing them...",
                  piEntries.size(), deviceId);
-        piEntries.forEach(entry -> {
-            log.debug(entry.toString());
-            final PiTableEntryHandle handle = PiTableEntryHandle.of(deviceId, entry);
-            ENTRY_LOCKS.get(handle).lock();
-            try {
-                applyEntry(handle, entry, null, REMOVE);
-            } finally {
-                ENTRY_LOCKS.get(handle).unlock();
-            }
-        });
+        // Remove entries and update mirror.
+        tableMirror.replayWriteResponse(
+                client.write(pipeconf).entities(piEntries, DELETE).submitSync());
     }
 
     private Collection<FlowRule> processFlowRules(Collection<FlowRule> rules,
                                                   Operation driverOperation) {
-
         if (!setupBehaviour() || rules.isEmpty()) {
             return Collections.emptyList();
         }
-
-        final ImmutableList.Builder<FlowRule> result = ImmutableList.builder();
-
-        // TODO: send writes in bulk (e.g. all entries to insert, modify or delete).
-        // Instead of calling the client for each one of them.
-
-        for (FlowRule ruleToApply : rules) {
-
-            final PiTableEntry piEntryToApply;
+        // Created batched write request.
+        final P4RuntimeWriteClient.WriteRequest request = client.write(pipeconf);
+        // For each rule, translate to PI and append to write request.
+        final Map<PiHandle, FlowRule> handleToRuleMap = Maps.newHashMap();
+        final List<FlowRule> skippedRules = Lists.newArrayList();
+        for (FlowRule rule : rules) {
+            final PiTableEntry entry;
             try {
-                piEntryToApply = translator.translate(ruleToApply, pipeconf);
+                entry = translator.translate(rule, pipeconf);
             } catch (PiTranslationException e) {
-                log.warn("Unable to translate flow rule for pipeconf '{}': {} - {}",
-                         pipeconf.id(), e.getMessage(), ruleToApply);
+                log.warn("Unable to translate flow rule for pipeconf '{}': {} [{}]",
+                         pipeconf.id(), e.getMessage(), rule);
                 // Next rule.
                 continue;
             }
-
-            final PiTableEntryHandle handle = PiTableEntryHandle
-                    .of(deviceId, piEntryToApply);
-
-            // Serialize operations over the same match key/table/device ID.
-            ENTRY_LOCKS.get(handle).lock();
-            try {
-                if (applyEntry(handle, piEntryToApply,
-                               ruleToApply, driverOperation)) {
-                    result.add(ruleToApply);
-                }
-            } finally {
-                ENTRY_LOCKS.get(handle).unlock();
+            final PiTableEntryHandle handle = entry.handle(deviceId);
+            handleToRuleMap.put(handle, rule);
+            // Append entry to batched write request (returns false), or skip (true)
+            if (appendEntryToWriteRequestOrSkip(
+                    request, handle, entry, driverOperation)) {
+                skippedRules.add(rule);
+                updateTranslationStore(
+                        driverOperation, handle, rule, entry);
             }
         }
-
-        return result.build();
+        // Submit request to server.
+        final P4RuntimeWriteClient.WriteResponse response = request.submitSync();
+        // Update mirror.
+        tableMirror.replayWriteResponse(response);
+        // Derive successfully applied flow rule from response.
+        final List<FlowRule> appliedRules = getAppliedFlowRulesAndUpdateTranslator(
+                response, handleToRuleMap, driverOperation);
+        // Return skipped and applied rules.
+        return ImmutableList.<FlowRule>builder()
+                .addAll(skippedRules).addAll(appliedRules).build();
     }
 
-    /**
-     * Applies the given entry to the device, and returns true if the operation
-     * was successful, false otherwise.
-     */
-    private boolean applyEntry(final PiTableEntryHandle handle,
-                               PiTableEntry piEntryToApply,
-                               final FlowRule ruleToApply,
-                               final Operation driverOperation) {
+    private List<FlowRule> getAppliedFlowRulesAndUpdateTranslator(
+            P4RuntimeWriteClient.WriteResponse response,
+            Map<PiHandle, FlowRule> handleToFlowRuleMap,
+            Operation driverOperation) {
+        // Returns a list of flow rules that were successfully written on the
+        // server according to the given write response and operation.
+        return response.success().stream()
+                .filter(r -> r.entityType().equals(PiEntityType.TABLE_ENTRY))
+                .map(r -> {
+                    final PiHandle handle = r.handle();
+                    final FlowRule rule = handleToFlowRuleMap.get(handle);
+                    if (rule == null) {
+                        log.error("Server returned unrecognized table entry " +
+                                          "handle in write response: {}", handle);
+                        return null;
+                    }
+                    // Filter intermediate responses (e.g. P4Runtime DELETE
+                    // during FlowRule APPLY because we are performing
+                    // delete-before-update)
+                    if (isUpdateTypeRelevant(r.updateType(), driverOperation)) {
+                        updateTranslationStore(
+                                driverOperation, (PiTableEntryHandle) handle,
+                                rule, (PiTableEntry) r.entity());
+                        return rule;
+                    }
+                    return null;
+                })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+    }
+
+    private boolean isUpdateTypeRelevant(UpdateType p4UpdateType, Operation driverOperation) {
+        switch (p4UpdateType) {
+            case INSERT:
+            case MODIFY:
+                if (!driverOperation.equals(APPLY)) {
+                    return false;
+                }
+                break;
+            case DELETE:
+                if (!driverOperation.equals(REMOVE)) {
+                    return false;
+                }
+                break;
+            default:
+                log.error("Unknown update type {}", p4UpdateType);
+                return false;
+        }
+        return true;
+    }
+
+    private void updateTranslationStore(
+            Operation operation, PiTableEntryHandle handle,
+            FlowRule rule, PiTableEntry entry) {
+        if (operation.equals(APPLY)) {
+            translator.learn(handle, new PiTranslatedEntity<>(
+                    rule, entry, handle));
+        } else {
+            translator.forget(handle);
+        }
+    }
+
+    private boolean appendEntryToWriteRequestOrSkip(
+            final P4RuntimeWriteClient.WriteRequest writeRequest,
+            final PiTableEntryHandle handle,
+            PiTableEntry piEntryToApply,
+            final Operation driverOperation) {
         // Depending on the driver operation, and if a matching rule exists on
-        // the device, decide which P4 Runtime write operation to perform for
-        // this entry.
+        // the device/mirror, decide which P4Runtime update operation to perform
+        // for this entry. In some cases, the entry is skipped from the write
+        // request but we want to return the corresponding flow rule as
+        // successfully written. In this case, we return true.
         final TimedEntry<PiTableEntry> piEntryOnDevice = tableMirror.get(handle);
-        final WriteOperationType p4Operation;
-        final WriteOperationType storeOperation;
+        final UpdateType updateType;
 
         final boolean defaultAsEntry = driverBoolProperty(
                 TABLE_DEFAULT_AS_ENTRY, DEFAULT_TABLE_DEFAULT_AS_ENTRY);
-        final boolean ignoreSameEntryUpdate = driverBoolProperty(
-                IGNORE_SAME_ENTRY_UPDATE, DEFAULT_IGNORE_SAME_ENTRY_UPDATE);
         final boolean deleteBeforeUpdate = driverBoolProperty(
                 DELETE_BEFORE_UPDATE, DEFAULT_DELETE_BEFORE_UPDATE);
+
         if (driverOperation == APPLY) {
             if (piEntryOnDevice == null) {
                 // Entry is first-timer, INSERT or MODIFY if default action.
-                p4Operation = !piEntryToApply.isDefaultAction() || defaultAsEntry
+                updateType = !piEntryToApply.isDefaultAction() || defaultAsEntry
                         ? INSERT : MODIFY;
-                storeOperation = p4Operation;
             } else {
-                if (ignoreSameEntryUpdate &&
-                        piEntryToApply.action().equals(piEntryOnDevice.entry().action())) {
+                if (piEntryToApply.action().equals(piEntryOnDevice.entry().action())) {
+                    // FIXME: should we check for other attributes of the table
+                    //  entry? For example can we modify the priority?
                     log.debug("Ignoring re-apply of existing entry: {}", piEntryToApply);
-                    p4Operation = null;
+                    return true;
                 } else if (deleteBeforeUpdate && !piEntryToApply.isDefaultAction()) {
-                    // Some devices return error when updating existing
-                    // entries. If requested, remove entry before
-                    // re-inserting the modified one, except the default action
-                    // entry, that cannot be removed.
-                    applyEntry(handle, piEntryOnDevice.entry(), null, REMOVE);
-                    p4Operation = INSERT;
+                    // Some devices return error when updating existing entries.
+                    // If requested, remove entry before re-inserting the
+                    // modified one, except the default action entry, that
+                    // cannot be removed.
+                    writeRequest.delete(handle);
+                    updateType = INSERT;
                 } else {
-                    p4Operation = MODIFY;
+                    updateType = MODIFY;
                 }
-                storeOperation = p4Operation;
             }
         } else {
+            // REMOVE.
             if (piEntryToApply.isDefaultAction()) {
                 // Cannot remove default action. Instead we should use the
                 // original defined by the interpreter (if any).
@@ -371,26 +407,13 @@
                 if (piEntryToApply == null) {
                     return false;
                 }
-                p4Operation = MODIFY;
+                updateType = MODIFY;
             } else {
-                p4Operation = DELETE;
+                updateType = DELETE;
             }
-            // Still want to delete the default entry from the mirror and
-            // translation store.
-            storeOperation = DELETE;
         }
-
-        if (p4Operation != null) {
-            if (writeEntry(piEntryToApply, p4Operation)) {
-                updateStores(handle, piEntryToApply, ruleToApply, storeOperation);
-                return true;
-            } else {
-                return false;
-            }
-        } else {
-            // If no operation, let's pretend we applied the rule to the device.
-            return true;
-        }
+        writeRequest.entity(piEntryToApply, updateType);
+        return false;
     }
 
     private PiTableEntry getOriginalDefaultEntry(PiTableId tableId) {
@@ -421,38 +444,6 @@
                 originalDefaultEntry.action().equals(entry.action());
     }
 
-    /**
-     * Performs a write operation on the device.
-     */
-    private boolean writeEntry(PiTableEntry entry,
-                               WriteOperationType p4Operation) {
-        final CompletableFuture<Boolean> future = client.writeTableEntries(
-                newArrayList(entry), p4Operation, pipeconf);
-        // If false, errors logged by internal calls.
-        return getFutureWithDeadline(
-                future, "performing table " + p4Operation.name(), false);
-    }
-
-    private void updateStores(PiTableEntryHandle handle,
-                              PiTableEntry entry,
-                              FlowRule rule,
-                              WriteOperationType p4Operation) {
-        switch (p4Operation) {
-            case INSERT:
-            case MODIFY:
-                tableMirror.put(handle, entry);
-                translator.learn(handle, new PiTranslatedEntity<>(rule, entry, handle));
-                break;
-            case DELETE:
-                tableMirror.remove(handle);
-                translator.forget(handle);
-                break;
-            default:
-                throw new IllegalArgumentException(
-                        "Unknown operation " + p4Operation.name());
-        }
-    }
-
     private Map<PiTableEntry, PiCounterCellData> readEntryCounters(
             Collection<PiTableEntry> tableEntries) {
         if (!driverBoolProperty(SUPPORT_TABLE_COUNTERS,
@@ -461,22 +452,44 @@
             return Collections.emptyMap();
         }
 
-        if (driverBoolProperty(READ_COUNTERS_WITH_TABLE_ENTRIES,
-                               DEFAULT_READ_COUNTERS_WITH_TABLE_ENTRIES)) {
-            return tableEntries.stream().collect(Collectors.toMap(c -> c, PiTableEntry::counter));
-        } else {
-            Collection<PiCounterCell> cells;
-            Set<PiCounterCellId> cellIds = tableEntries.stream()
-                    // Ignore counter for default entry.
-                    .filter(e -> !e.isDefaultAction())
-                    .filter(e -> tableHasCounter(e.table()))
-                    .map(PiCounterCellId::ofDirect)
-                    .collect(Collectors.toSet());
-            cells = getFutureWithDeadline(client.readCounterCells(cellIds, pipeconf),
-                                              "reading table counters", Collections.emptyList());
-            return cells.stream()
-                    .collect(Collectors.toMap(c -> c.cellId().tableEntry(), PiCounterCell::data));
+        final Map<PiTableEntry, PiCounterCellData> cellDataMap = Maps.newHashMap();
+
+        // We expect the server to return table entries with counter data (if
+        // the table supports counter). Here we extract such counter data and we
+        // determine if there are missing counter cells (if, for example, the
+        // serves does not support returning counter data with table entries)
+        final Set<PiHandle> missingCellHandles = tableEntries.stream()
+                .map(t -> {
+                    if (t.counter() != null) {
+                        // Counter data found in table entry.
+                        cellDataMap.put(t, t.counter());
+                        return null;
+                    } else {
+                        return t;
+                    }
+                })
+                .filter(Objects::nonNull)
+                // Ignore for default entries and for tables without counters.
+                .filter(e -> !e.isDefaultAction())
+                .filter(e -> tableHasCounter(e.table()))
+                .map(PiCounterCellId::ofDirect)
+                .map(id -> PiCounterCellHandle.of(deviceId, id))
+                .collect(Collectors.toSet());
+        // We might be sending a large read request (for thousands or more
+        // of counter cell handles). We request the driver to vet this
+        // operation via driver property.
+        if (!missingCellHandles.isEmpty()
+                && !driverBoolProperty(READ_COUNTERS_WITH_TABLE_ENTRIES,
+                                       DEFAULT_READ_COUNTERS_WITH_TABLE_ENTRIES)) {
+            client.read(pipeconf)
+                    .handles(missingCellHandles)
+                    .submitSync()
+                    .all(PiCounterCell.class).stream()
+                    .filter(c -> c.cellId().counterType().equals(PiCounterType.DIRECT))
+                    .forEach(c -> cellDataMap.put(c.cellId().tableEntry(), c.data()));
         }
+
+        return cellDataMap;
     }
 
     private boolean tableHasCounter(PiTableId tableId) {
@@ -484,11 +497,6 @@
                 !pipelineModel.table(tableId).get().counters().isEmpty();
     }
 
-    private boolean tableIsConstant(PiTableId tableId) {
-        return pipelineModel.table(tableId).isPresent() &&
-                pipelineModel.table(tableId).get().isConstantTable();
-    }
-
     enum Operation {
         APPLY, REMOVE
     }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index 15836b1..4d84b2a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -35,11 +35,12 @@
     public CompletableFuture<Boolean> connect() {
         return CompletableFuture
                 .supplyAsync(super::createClient)
-                .thenComposeAsync(client -> {
+                .thenApplyAsync(client -> {
                     if (client == null) {
-                        return CompletableFuture.completedFuture(false);
+                        return false;
                     }
-                    return client.startStreamChannel();
+                    client.openSession();
+                    return true;
                 });
     }
 
@@ -48,7 +49,7 @@
         final P4RuntimeController controller = handler().get(P4RuntimeController.class);
         final DeviceId deviceId = handler().data().deviceId();
         final P4RuntimeClient client = controller.getClient(deviceId);
-        return client != null && client.isStreamChannelOpen();
+        return client != null && client.isSessionOpen();
     }
 
     @Override
@@ -85,12 +86,7 @@
     @Override
     public void roleChanged(MastershipRole newRole) {
         if (setupBehaviour() && newRole.equals(MastershipRole.MASTER)) {
-            client.becomeMaster().thenAcceptAsync(result -> {
-                if (!result) {
-                    log.error("Unable to notify mastership role {} to {}",
-                              newRole, deviceId);
-                }
-            });
+            client.runForMastership();
         }
     }
 
@@ -99,7 +95,7 @@
         final P4RuntimeController controller = handler().get(P4RuntimeController.class);
         final DeviceId deviceId = handler().data().deviceId();
         final P4RuntimeClient client = controller.getClient(deviceId);
-        if (client == null || !client.isStreamChannelOpen()) {
+        if (client == null || !client.isSessionOpen()) {
             return MastershipRole.NONE;
         }
         return client.isMaster() ? MastershipRole.MASTER : MastershipRole.STANDBY;
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
index f2dff80..c9c436d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
@@ -31,7 +31,7 @@
 import org.onosproject.net.pi.model.PiMeterModel;
 import org.onosproject.net.pi.model.PiPipelineModel;
 import org.onosproject.net.pi.runtime.PiMeterCellConfig;
-import org.onosproject.net.pi.runtime.PiMeterHandle;
+import org.onosproject.net.pi.runtime.PiMeterCellHandle;
 import org.onosproject.net.pi.service.PiMeterTranslator;
 import org.onosproject.net.pi.service.PiTranslationException;
 
@@ -40,26 +40,23 @@
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
-import static com.google.common.collect.Lists.newArrayList;
-
 /**
  * Implementation of MeterProgrammable behaviour for P4Runtime.
  */
 public class P4RuntimeMeterProgrammable extends AbstractP4RuntimeHandlerBehaviour implements MeterProgrammable {
 
     private static final int METER_LOCK_EXPIRE_TIME_IN_MIN = 10;
-    private static final LoadingCache<PiMeterHandle, Lock>
+    private static final LoadingCache<PiMeterCellHandle, Lock>
             ENTRY_LOCKS = CacheBuilder.newBuilder()
             .expireAfterAccess(METER_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
-            .build(new CacheLoader<PiMeterHandle, Lock>() {
+            .build(new CacheLoader<PiMeterCellHandle, Lock>() {
                 @Override
-                public Lock load(PiMeterHandle handle) {
+                public Lock load(PiMeterCellHandle handle) {
                     return new ReentrantLock();
                 }
             });
@@ -93,7 +90,7 @@
     private boolean processMeterOp(MeterOperation meterOp) {
 
         if (meterOp.type() != MeterOperation.Type.MODIFY) {
-            log.warn("P4runtime meter operations must be MODIFY!");
+            log.warn("P4Runtime meter operations must be MODIFY!");
             return false;
         }
 
@@ -106,11 +103,10 @@
             return false;
         }
 
-        final PiMeterHandle handle = PiMeterHandle.of(deviceId, piMeterCellConfig);
+        final PiMeterCellHandle handle = PiMeterCellHandle.of(deviceId, piMeterCellConfig);
         ENTRY_LOCKS.getUnchecked(handle).lock();
-        final boolean result = getFutureWithDeadline(
-                client.writeMeterCells(newArrayList(piMeterCellConfig), pipeconf),
-                "writing meter cells", false);
+        final boolean result = client.write(pipeconf)
+                .modify(piMeterCellConfig).submitSync().isSuccess();
         if (result) {
             meterMirror.put(handle, piMeterCellConfig);
         }
@@ -133,13 +129,8 @@
             meterIds.add(mode.id());
         }
 
-        try {
-            piMeterCellConfigs = client.readAllMeterCells(meterIds, pipeconf).get();
-        } catch (InterruptedException | ExecutionException e) {
-            log.warn("Exception while reading meters from {}: {}", deviceId, e.toString());
-            log.debug("", e);
-            return CompletableFuture.completedFuture(Collections.emptyList());
-        }
+        piMeterCellConfigs = client.read(pipeconf)
+                .meterCells(meterIds).submitSync().all(PiMeterCellConfig.class);
 
         Collection<Meter> meters = piMeterCellConfigs.stream()
                 .map(p -> {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMulticastGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMulticastGroupProgrammable.java
index ecdda08..fcb578d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMulticastGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMulticastGroupProgrammable.java
@@ -42,9 +42,9 @@
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.DELETE;
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.INSERT;
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.MODIFY;
+import static org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType.DELETE;
+import static org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType.INSERT;
+import static org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType.MODIFY;
 
 /**
  * Implementation of GroupProgrammable to handle multicast groups in P4Runtime.
@@ -52,6 +52,8 @@
 public class P4RuntimeMulticastGroupProgrammable
         extends AbstractP4RuntimeHandlerBehaviour implements GroupProgrammable {
 
+    // TODO: implement reading groups from device and mirror sync.
+
     // Needed to synchronize operations over the same group.
     private static final Striped<Lock> STRIPED_LOCKS = Striped.lock(30);
 
@@ -92,7 +94,7 @@
     }
 
     private Collection<Group> getMcGroups() {
-        // TODO: missing support for reading multicast groups is ready in PI/Stratum.
+        // TODO: missing support for reading multicast groups in PI/Stratum.
         return getMcGroupsFromMirror();
     }
 
@@ -160,17 +162,15 @@
         }
     }
 
-    private boolean writeMcGroupOnDevice(PiMulticastGroupEntry group, P4RuntimeClient.WriteOperationType opType) {
-        return getFutureWithDeadline(
-                client.writePreMulticastGroupEntries(
-                        Collections.singletonList(group), opType),
-                "performing multicast group " + opType, false);
+    private boolean writeMcGroupOnDevice(
+            PiMulticastGroupEntry group, P4RuntimeClient.UpdateType opType) {
+        return client.write(pipeconf).entity(group, opType).submitSync().isSuccess();
     }
 
     private boolean mcGroupApply(PiMulticastGroupEntryHandle handle,
                                  PiMulticastGroupEntry piGroup,
                                  Group pdGroup,
-                                 P4RuntimeClient.WriteOperationType opType) {
+                                 P4RuntimeClient.UpdateType opType) {
         switch (opType) {
             case DELETE:
                 if (writeMcGroupOnDevice(piGroup, DELETE)) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
index 7122784..73acf98 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
@@ -47,9 +47,7 @@
             Collection<PiPacketOperation> operations = interpreter.mapOutboundPacket(packet);
             operations.forEach(piPacketOperation -> {
                 log.debug("Doing PiPacketOperation {}", piPacketOperation);
-                getFutureWithDeadline(
-                        client.packetOut(piPacketOperation, pipeconf),
-                        "sending packet-out", false);
+                client.packetOut(piPacketOperation, pipeconf);
             });
         } catch (PiPipelineInterpreter.PiInterpreterException e) {
             log.error("Unable to translate outbound packet for {} with pipeconf {}: {}",
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index e1fc1e9..fb9451c 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -23,10 +23,13 @@
 import org.onosproject.net.Annotations;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.pi.runtime.PiEntity;
+import org.onosproject.net.pi.runtime.PiEntityType;
 import org.onosproject.net.pi.runtime.PiHandle;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.WallClockTimestamp;
@@ -66,26 +69,38 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected PiPipeconfWatchdogService pipeconfWatchdogService;
 
-    private EventuallyConsistentMap<H, TimedEntry<E>> mirrorMap;
+    private EventuallyConsistentMap<PiHandle, TimedEntry<E>> mirrorMap;
+    private EventuallyConsistentMap<PiHandle, Annotations> annotationsMap;
 
-    private EventuallyConsistentMap<H, Annotations> annotationsMap;
+    private final PiEntityType entityType;
 
     private final PiPipeconfWatchdogListener pipeconfListener =
             new InternalPipeconfWatchdogListener();
 
+    AbstractDistributedP4RuntimeMirror(PiEntityType entityType) {
+        this.entityType = entityType;
+    }
+
     @Activate
     public void activate() {
+        final String mapName = "onos-p4runtime-mirror-"
+                + entityType.name().toLowerCase();
+        final KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(TimedEntry.class)
+                .build();
+
         mirrorMap = storageService
-                .<H, TimedEntry<E>>eventuallyConsistentMapBuilder()
-                .withName(mapName())
-                .withSerializer(storeSerializer())
+                .<PiHandle, TimedEntry<E>>eventuallyConsistentMapBuilder()
+                .withName(mapName)
+                .withSerializer(serializer)
                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
                 .build();
 
         annotationsMap = storageService
-                .<H, Annotations>eventuallyConsistentMapBuilder()
-                .withName(mapName() + "-annotations")
-                .withSerializer(storeSerializer())
+                .<PiHandle, Annotations>eventuallyConsistentMapBuilder()
+                .withName(mapName + "-annotations")
+                .withSerializer(serializer)
                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
                 .build();
 
@@ -93,10 +108,6 @@
         log.info("Started");
     }
 
-    abstract String mapName();
-
-    abstract KryoNamespace storeSerializer();
-
     @Deactivate
     public void deactivate() {
         pipeconfWatchdogService.removeListener(pipeconfListener);
@@ -158,9 +169,12 @@
     }
 
     @Override
-    public void sync(DeviceId deviceId, Map<H, E> deviceState) {
+    @SuppressWarnings("unchecked")
+    public void sync(DeviceId deviceId, Collection<E> entities) {
         checkNotNull(deviceId);
-        final Map<H, E> localState = deviceHandleMap(deviceId);
+        final Map<PiHandle, E> deviceState = entities.stream()
+                .collect(Collectors.toMap(e -> e.handle(deviceId), e -> e));
+        final Map<PiHandle, E> localState = deviceHandleMap(deviceId);
 
         final AtomicInteger removeCount = new AtomicInteger(0);
         final AtomicInteger updateCount = new AtomicInteger(0);
@@ -172,7 +186,7 @@
                     final E entryToAdd = deviceState.get(deviceHandle);
                     log.debug("Adding mirror entry for {}: {}",
                               deviceId, entryToAdd);
-                    put(deviceHandle, entryToAdd);
+                    put((H) deviceHandle, entryToAdd);
                     addCount.incrementAndGet();
                 });
         // Update or remove local entries.
@@ -181,12 +195,12 @@
             final E deviceEntry = deviceState.get(localHandle);
             if (deviceEntry == null) {
                 log.debug("Removing mirror entry for {}: {}", deviceId, localEntry);
-                remove(localHandle);
+                remove((H) localHandle);
                 removeCount.incrementAndGet();
             } else if (!deviceEntry.equals(localEntry)) {
                 log.debug("Updating mirror entry for {}: {}-->{}",
                           deviceId, localEntry, deviceEntry);
-                put(localHandle, deviceEntry);
+                put((H) localHandle, deviceEntry);
                 updateCount.incrementAndGet();
             }
         });
@@ -196,27 +210,48 @@
         }
     }
 
-    private Set<H> getHandlesForDevice(DeviceId deviceId) {
+    private Set<PiHandle> getHandlesForDevice(DeviceId deviceId) {
         return mirrorMap.keySet().stream()
                 .filter(h -> h.deviceId().equals(deviceId))
                 .collect(Collectors.toSet());
     }
 
-    @Override
-    public Map<H, E> deviceHandleMap(DeviceId deviceId) {
-        final Map<H, E> deviceMap = Maps.newHashMap();
+    private Map<PiHandle, E> deviceHandleMap(DeviceId deviceId) {
+        final Map<PiHandle, E> deviceMap = Maps.newHashMap();
         mirrorMap.entrySet().stream()
                 .filter(e -> e.getKey().deviceId().equals(deviceId))
                 .forEach(e -> deviceMap.put(e.getKey(), e.getValue().entry()));
         return deviceMap;
     }
 
+
     private void removeAll(DeviceId deviceId) {
         checkNotNull(deviceId);
-        Collection<H> handles = getHandlesForDevice(deviceId);
+        @SuppressWarnings("unchecked")
+        Collection<H> handles = (Collection<H>) getHandlesForDevice(deviceId);
         handles.forEach(this::remove);
     }
 
+    @Override
+    @SuppressWarnings("unchecked")
+    public void replayWriteResponse(P4RuntimeWriteClient.WriteResponse response) {
+        response.success().stream()
+                .filter(r -> r.entityType().equals(this.entityType) && r.isSuccess())
+                .forEach(r -> {
+                    switch (r.updateType()) {
+                        case INSERT:
+                        case MODIFY:
+                            put((H) r.handle(), (E) r.entity());
+                            break;
+                        case DELETE:
+                            remove((H) r.handle());
+                            break;
+                        default:
+                            log.error("Unknown update type {}", r.updateType());
+                    }
+                });
+    }
+
     public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
         @Override
         public void event(PiPipeconfWatchdogEvent event) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileGroupMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileGroupMirror.java
index ef2d6bd..9d5d4b1 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileGroupMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileGroupMirror.java
@@ -16,10 +16,9 @@
 
 package org.onosproject.drivers.p4runtime.mirror;
 
-import org.onlab.util.KryoNamespace;
 import org.onosproject.net.pi.runtime.PiActionProfileGroup;
 import org.onosproject.net.pi.runtime.PiActionProfileGroupHandle;
-import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.net.pi.runtime.PiEntityType;
 import org.osgi.service.component.annotations.Component;
 
 /**
@@ -31,18 +30,7 @@
         <PiActionProfileGroupHandle, PiActionProfileGroup>
         implements P4RuntimeActionProfileGroupMirror {
 
-    private static final String DIST_MAP_NAME = "onos-p4runtime-act-prof-group-mirror";
-
-    @Override
-    String mapName() {
-        return DIST_MAP_NAME;
-    }
-
-    @Override
-    KryoNamespace storeSerializer() {
-        return KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .register(TimedEntry.class)
-                .build();
+    public DistributedP4RuntimeActionProfileGroupMirror() {
+        super(PiEntityType.ACTION_PROFILE_GROUP);
     }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileMemberMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileMemberMirror.java
index 5b2ff21..f84006b 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileMemberMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeActionProfileMemberMirror.java
@@ -16,10 +16,9 @@
 
 package org.onosproject.drivers.p4runtime.mirror;
 
-import org.onlab.util.KryoNamespace;
 import org.onosproject.net.pi.runtime.PiActionProfileMember;
 import org.onosproject.net.pi.runtime.PiActionProfileMemberHandle;
-import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.net.pi.runtime.PiEntityType;
 import org.osgi.service.component.annotations.Component;
 
 /**
@@ -31,18 +30,7 @@
         <PiActionProfileMemberHandle, PiActionProfileMember>
         implements P4RuntimeActionProfileMemberMirror {
 
-    private static final String DIST_MAP_NAME = "onos-p4runtime-act-prof-member-mirror";
-
-    @Override
-    String mapName() {
-        return DIST_MAP_NAME;
-    }
-
-    @Override
-    KryoNamespace storeSerializer() {
-        return KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .register(TimedEntry.class)
-                .build();
+    public DistributedP4RuntimeActionProfileMemberMirror() {
+        super(PiEntityType.ACTION_PROFILE_MEMBER);
     }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeMeterMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeMeterMirror.java
index bf36274..581c83a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeMeterMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeMeterMirror.java
@@ -16,10 +16,9 @@
 
 package org.onosproject.drivers.p4runtime.mirror;
 
-import org.onlab.util.KryoNamespace;
+import org.onosproject.net.pi.runtime.PiEntityType;
 import org.onosproject.net.pi.runtime.PiMeterCellConfig;
-import org.onosproject.net.pi.runtime.PiMeterHandle;
-import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.net.pi.runtime.PiMeterCellHandle;
 import org.osgi.service.component.annotations.Component;
 
 /**
@@ -28,21 +27,10 @@
 @Component(immediate = true, service = P4RuntimeMeterMirror.class)
 public final class DistributedP4RuntimeMeterMirror
         extends AbstractDistributedP4RuntimeMirror
-        <PiMeterHandle, PiMeterCellConfig>
+        <PiMeterCellHandle, PiMeterCellConfig>
         implements P4RuntimeMeterMirror {
 
-    private static final String DIST_MAP_NAME = "onos-p4runtime-meter-mirror";
-
-    @Override
-    String mapName() {
-        return DIST_MAP_NAME;
+    public DistributedP4RuntimeMeterMirror() {
+        super(PiEntityType.METER_CELL_CONFIG);
     }
-
-    @Override
-    KryoNamespace storeSerializer() {
-        return KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .register(TimedEntry.class)
-                .build();
-    }
-}
\ No newline at end of file
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeMulticastGroupMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeMulticastGroupMirror.java
index 83c23d8..5ccaec5 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeMulticastGroupMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeMulticastGroupMirror.java
@@ -16,10 +16,9 @@
 
 package org.onosproject.drivers.p4runtime.mirror;
 
-import org.onlab.util.KryoNamespace;
+import org.onosproject.net.pi.runtime.PiEntityType;
 import org.onosproject.net.pi.runtime.PiMulticastGroupEntry;
 import org.onosproject.net.pi.runtime.PiMulticastGroupEntryHandle;
-import org.onosproject.store.serializers.KryoNamespaces;
 import org.osgi.service.component.annotations.Component;
 
 /**
@@ -31,18 +30,7 @@
                         <PiMulticastGroupEntryHandle, PiMulticastGroupEntry>
         implements P4RuntimeMulticastGroupMirror {
 
-    private static final String DIST_MAP_NAME = "onos-p4runtime-mc-group-mirror";
-
-    @Override
-    String mapName() {
-        return DIST_MAP_NAME;
-    }
-
-    @Override
-    KryoNamespace storeSerializer() {
-        return KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .register(TimedEntry.class)
-                .build();
+    public DistributedP4RuntimeMulticastGroupMirror() {
+        super(PiEntityType.PRE_MULTICAST_GROUP_ENTRY);
     }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeTableMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeTableMirror.java
index 320b3da..4f5235f 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeTableMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimeTableMirror.java
@@ -16,10 +16,9 @@
 
 package org.onosproject.drivers.p4runtime.mirror;
 
-import org.onlab.util.KryoNamespace;
+import org.onosproject.net.pi.runtime.PiEntityType;
 import org.onosproject.net.pi.runtime.PiTableEntry;
 import org.onosproject.net.pi.runtime.PiTableEntryHandle;
-import org.onosproject.store.serializers.KryoNamespaces;
 import org.osgi.service.component.annotations.Component;
 
 /**
@@ -31,18 +30,7 @@
                         <PiTableEntryHandle, PiTableEntry>
         implements P4RuntimeTableMirror {
 
-    private static final String DIST_MAP_NAME = "onos-p4runtime-table-mirror";
-
-    @Override
-    String mapName() {
-        return DIST_MAP_NAME;
-    }
-
-    @Override
-    KryoNamespace storeSerializer() {
-        return KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .register(TimedEntry.class)
-                .build();
+    public DistributedP4RuntimeTableMirror() {
+        super(PiEntityType.TABLE_ENTRY);
     }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMeterMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMeterMirror.java
index 668492a..ee88c6a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMeterMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMeterMirror.java
@@ -18,12 +18,12 @@
 
 import com.google.common.annotations.Beta;
 import org.onosproject.net.pi.runtime.PiMeterCellConfig;
-import org.onosproject.net.pi.runtime.PiMeterHandle;
+import org.onosproject.net.pi.runtime.PiMeterCellHandle;
 
 /**
  * Mirror of meters installed on a P4Runtime device.
  */
 @Beta
 public interface P4RuntimeMeterMirror
-        extends P4RuntimeMirror<PiMeterHandle, PiMeterCellConfig> {
+        extends P4RuntimeMirror<PiMeterCellHandle, PiMeterCellConfig> {
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
index d62bbb8..bee8c51 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
@@ -21,9 +21,9 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.pi.runtime.PiEntity;
 import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
 
 import java.util.Collection;
-import java.util.Map;
 
 /**
  * Service to keep track of the device state for a given class of PI entities.
@@ -75,15 +75,6 @@
     void remove(H handle);
 
     /**
-     * Returns a map of handles and corresponding PI entities for the given
-     * device.
-     *
-     * @param deviceId device ID
-     * @return map of handles and corresponding PI entities
-     */
-    Map<H, E> deviceHandleMap(DeviceId deviceId);
-
-    /**
      * Stores the given annotations associating it to the given handle.
      *
      * @param handle      handle
@@ -101,10 +92,18 @@
     Annotations annotations(H handle);
 
     /**
-     * Synchronizes the state of the given device ID with the given handle map.
-     *
-     * @param deviceId  device ID
-     * @param handleMap handle map
+     * Synchronizes the state of the given device ID with the given collection
+     * of PI entities.
+     * @param deviceId device ID
+     * @param entities collection of PI entities
      */
-    void sync(DeviceId deviceId, Map<H, E> handleMap);
+    void sync(DeviceId deviceId, Collection<E> entities);
+
+    /**
+     * Uses the given P4Runtime write response to update the state of this
+     * mirror.
+     *
+     * @param response P4Runtime write response
+     */
+    void replayWriteResponse(P4RuntimeWriteClient.WriteResponse response);
 }