Improve scalability of P4Runtime subsystem

The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.

Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)

Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups

Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
index 8e2b54d..ac4856a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
@@ -16,11 +16,13 @@
 
 package org.onosproject.drivers.p4runtime;
 
+import io.grpc.StatusRuntimeException;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.net.pi.service.PiTranslationService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
@@ -28,6 +30,11 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -35,6 +42,10 @@
  */
 public class AbstractP4RuntimeHandlerBehaviour extends AbstractHandlerBehaviour {
 
+    // Timeout in seconds for device operations.
+    // TODO make configurable via driver properties
+    private static final int DEVICE_OP_TIMEOUT = 5;
+
     public static final String P4RUNTIME_SERVER_ADDR_KEY = "p4runtime_ip";
     public static final String P4RUNTIME_SERVER_PORT_KEY = "p4runtime_port";
     public static final String P4RUNTIME_DEVICE_ID_KEY = "p4runtime_deviceId";
@@ -76,12 +87,18 @@
         client = controller.getClient(deviceId);
 
         PiPipeconfService piPipeconfService = handler().get(PiPipeconfService.class);
-        if (!piPipeconfService.ofDevice(deviceId).isPresent() ||
-                !piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).isPresent()) {
-            log.warn("Unable to get the pipeconf of {}, aborting operation", deviceId);
+        if (!piPipeconfService.ofDevice(deviceId).isPresent()) {
+            log.warn("Unable to get assigned pipeconf for {} (mapping " +
+                             "missing in PiPipeconfService), aborting operation",
+                     deviceId);
             return false;
         }
-        pipeconf = piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).get();
+        PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).get();
+        if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
+            log.warn("Cannot find any pipeconf with ID '{}' ({}), aborting operation", pipeconfId, deviceId);
+            return false;
+        }
+        pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
 
         piTranslationService = handler().get(PiTranslationService.class);
 
@@ -89,12 +106,12 @@
     }
 
     /**
-     * Create a P4Runtime client for this device. Returns true if the operation
-     * was successful, false otherwise.
+     * Returns a P4Runtime client for this device, null if such client cannot
+     * be created.
      *
-     * @return true if successful, false otherwise
+     * @return client or null
      */
-    protected boolean createClient() {
+     P4RuntimeClient createClient() {
         deviceId = handler().data().deviceId();
         controller = handler().get(P4RuntimeController.class);
 
@@ -105,24 +122,38 @@
         if (serverAddr == null || serverPortString == null || p4DeviceIdString == null) {
             log.warn("Unable to create client for {}, missing driver data key (required is {}, {}, and {})",
                      deviceId, P4RUNTIME_SERVER_ADDR_KEY, P4RUNTIME_SERVER_PORT_KEY, P4RUNTIME_DEVICE_ID_KEY);
-            return false;
+            return null;
         }
 
-        if (!controller.createClient(deviceId, serverAddr,
-                                     Integer.parseUnsignedInt(serverPortString),
-                                     Long.parseUnsignedLong(p4DeviceIdString))) {
+         final int serverPort;
+         final long p4DeviceId;
+
+         try {
+            serverPort = Integer.parseUnsignedInt(serverPortString);
+        } catch (NumberFormatException e) {
+            log.error("{} is not a valid P4Runtime port number", serverPortString);
+            return null;
+        }
+         try {
+             p4DeviceId = Long.parseUnsignedLong(p4DeviceIdString);
+         } catch (NumberFormatException e) {
+             log.error("{} is not a valid P4Runtime-internal device ID", p4DeviceIdString);
+             return null;
+         }
+
+        if (!controller.createClient(deviceId, serverAddr, serverPort, p4DeviceId)) {
             log.warn("Unable to create client for {}, aborting operation", deviceId);
-            return false;
+            return null;
         }
 
-        return true;
+        return controller.getClient(deviceId);
     }
 
     /**
-     * Returns the value of the given driver property, if present,
-     * otherwise returns the given default value.
+     * Returns the value of the given driver property, if present, otherwise
+     * returns the given default value.
      *
-     * @param propName property name
+     * @param propName   property name
      * @param defaultVal default value
      * @return boolean
      */
@@ -134,4 +165,36 @@
             return Boolean.parseBoolean(handler().driver().getProperty(propName));
         }
     }
+
+    /**
+     * Convenience method to get the result of a completable future while
+     * setting a timeout and checking for exceptions.
+     *
+     * @param future        completable future
+     * @param opDescription operation description to use in log messages. Should
+     *                      be a sentence starting with a verb ending in -ing,
+     *                      e.g. "reading...", "writing...", etc.
+     * @param defaultValue  value to return if operation fails
+     * @param <U>           type of returned value
+     * @return future result or default value
+     */
+    <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
+                                U defaultValue) {
+        try {
+            return future.get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("Exception while {} on {}", opDescription, deviceId);
+        } catch (ExecutionException e) {
+            final Throwable cause = e.getCause();
+            if (cause instanceof StatusRuntimeException) {
+                final StatusRuntimeException grpcError = (StatusRuntimeException) cause;
+                log.warn("Error while {} on {}: {}", opDescription, deviceId, grpcError.getMessage());
+            } else {
+                log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
+            }
+        } catch (TimeoutException e) {
+            log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
+        }
+        return defaultValue;
+    }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
index 6a6c2e9..cfe717d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
@@ -18,9 +18,8 @@
 
 import org.onlab.util.SharedExecutors;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.behaviour.PiPipelineProgrammable;
+import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.slf4j.Logger;
@@ -28,14 +27,14 @@
 import java.nio.ByteBuffer;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Abstract implementation of the PiPipelineProgrammable behaviours for a P4Runtime device.
  */
-public abstract class AbstractP4RuntimePipelineProgrammable extends AbstractHandlerBehaviour
+public abstract class AbstractP4RuntimePipelineProgrammable
+        extends AbstractP4RuntimeHandlerBehaviour
         implements PiPipelineProgrammable {
 
     protected final Logger log = getLogger(getClass());
@@ -72,25 +71,31 @@
             return false;
         }
 
-        try {
-            if (!client.initStreamChannel().get()) {
-                log.warn("Unable to init stream channel to {}.", deviceId);
-                return false;
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            log.error("Exception while initializing stream channel on {}", deviceId, e);
+        // We need to be master to perform write RPC to the device.
+        // FIXME: properly perform mastership handling in the device provider
+        // This would probably mean deploying the pipeline after the device as
+        // been notified to the core.
+        final Boolean masterSuccess = getFutureWithDeadline(
+                client.becomeMaster(),
+                "becoming master ", null);
+        if (masterSuccess == null) {
+            // Error already logged by getFutureWithDeadline()
+            return false;
+        } else if (!masterSuccess) {
+            log.warn("Unable to become master for {}, aborting pipeconf deploy", deviceId);
             return false;
         }
 
-        try {
-            if (!client.setPipelineConfig(pipeconf, deviceDataBuffer).get()) {
-                log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
-                return false;
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            log.error("Exception while deploying pipeconf to {}", deviceId, e);
+        final Boolean deploySuccess = getFutureWithDeadline(
+                client.setPipelineConfig(pipeconf, deviceDataBuffer),
+                "deploying pipeconf", null);
+        if (deploySuccess == null) {
+            return false;
+        } else if (!deploySuccess) {
+            log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
             return false;
         }
+
         return true;
     }
 
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index 00fe9380..2b941cb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -21,7 +21,6 @@
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import io.grpc.StatusRuntimeException;
 import org.onlab.util.SharedExecutors;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
 import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
@@ -32,7 +31,6 @@
 import org.onosproject.net.pi.model.PiPipelineInterpreter;
 import org.onosproject.net.pi.model.PiPipelineModel;
 import org.onosproject.net.pi.model.PiTableId;
-import org.onosproject.net.pi.model.PiTableModel;
 import org.onosproject.net.pi.runtime.PiCounterCellData;
 import org.onosproject.net.pi.runtime.PiCounterCellId;
 import org.onosproject.net.pi.runtime.PiTableEntry;
@@ -48,7 +46,7 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -110,7 +108,6 @@
                 }
             });
     private PiPipelineModel pipelineModel;
-    private PiPipelineInterpreter interpreter;
     private P4RuntimeTableMirror tableMirror;
     private PiFlowRuleTranslator translator;
 
@@ -125,7 +122,6 @@
             log.warn("Unable to get interpreter of {}", deviceId);
             return false;
         }
-        interpreter = device.as(PiPipelineInterpreter.class);
         pipelineModel = pipeconf.pipelineModel();
         tableMirror = handler().get(P4RuntimeTableMirror.class);
         translator = piTranslationService.flowRuleTranslator();
@@ -146,49 +142,35 @@
         final ImmutableList.Builder<FlowEntry> result = ImmutableList.builder();
         final List<PiTableEntry> inconsistentEntries = Lists.newArrayList();
 
-        for (PiTableModel tableModel : pipelineModel.tables()) {
+        // Read table entries.
+        final Collection<PiTableEntry> installedEntries;
+        // TODO: ONOS-7596 read counters with table entries
+        installedEntries = getFutureWithDeadline(client.dumpAllTables(pipeconf),
+                                                 "dumping tables", Collections.emptyList());
 
-            final PiTableId piTableId = tableModel.id();
+        if (installedEntries.isEmpty()) {
+            return Collections.emptyList();
+        }
 
-            // Read table entries.
-            final Collection<PiTableEntry> installedEntries;
-            try {
-                // TODO: optimize by dumping entries and counters in parallel
-                // From ALL tables with the same request.
-                installedEntries = client.dumpTable(piTableId, pipeconf).get();
-            } catch (InterruptedException | ExecutionException e) {
-                if (!(e.getCause() instanceof StatusRuntimeException)) {
-                    // gRPC errors are logged in the client.
-                    log.error("Exception while dumping table {} of {}",
-                              piTableId, deviceId, e);
-                }
-                continue; // next table
-            }
+        // Read table direct counters (if any).
+        final Map<PiTableEntry, PiCounterCellData> counterCellMap =
+                readEntryCounters(installedEntries);
 
-            if (installedEntries.size() == 0) {
-                continue; // next table
-            }
+        // Forge flow entries with counter values.
+        for (PiTableEntry installedEntry : installedEntries) {
 
-            // Read table direct counters (if any).
-            final Map<PiTableEntry, PiCounterCellData> counterCellMap =
-                    readEntryCounters(installedEntries);
+            final FlowEntry flowEntry = forgeFlowEntry(
+                    installedEntry, counterCellMap.get(installedEntry));
 
-            // Forge flow entries with counter values.
-            for (PiTableEntry installedEntry : installedEntries) {
-
-                final FlowEntry flowEntry = forgeFlowEntry(
-                        installedEntry, counterCellMap.get(installedEntry));
-
-                if (flowEntry == null) {
-                    // Entry is on device but unknown to translation service or
-                    // device mirror. Inconsistent. Mark for removal.
-                    // TODO: make this behaviour configurable
-                    // In some cases it's fine for the device to have rules
-                    // that were not installed by us.
-                    inconsistentEntries.add(installedEntry);
-                } else {
-                    result.add(flowEntry);
-                }
+            if (flowEntry == null) {
+                // Entry is on device but unknown to translation service or
+                // device mirror. Inconsistent. Mark for removal.
+                // TODO: make this behaviour configurable
+                // In some cases it's fine for the device to have rules
+                // that were not installed by us.
+                inconsistentEntries.add(installedEntry);
+            } else {
+                result.add(flowEntry);
             }
         }
 
@@ -354,19 +336,19 @@
      */
     private boolean writeEntry(PiTableEntry entry,
                                WriteOperationType p4Operation) {
-        try {
-            if (client.writeTableEntries(
-                    newArrayList(entry), p4Operation, pipeconf).get()) {
-                return true;
-            } else {
-                log.warn("Unable to {} table entry in {}: {}",
-                         p4Operation.name(), deviceId, entry);
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            log.warn("Exception while performing {} table entry operation:",
-                     p4Operation, e);
+        final CompletableFuture<Boolean> future = client.writeTableEntries(
+                newArrayList(entry), p4Operation, pipeconf);
+        final Boolean success = getFutureWithDeadline(
+                future, "performing table " + p4Operation.name(), null);
+        if (success == null) {
+            // Error logged by getFutureWithDeadline();
+            return false;
         }
-        return false;
+        if (!success) {
+            log.warn("Unable to {} table entry in {}: {}",
+                     p4Operation.name(), deviceId, entry);
+        }
+        return success;
     }
 
     private void updateStores(PiTableEntryHandle handle,
@@ -392,36 +374,28 @@
     private Map<PiTableEntry, PiCounterCellData> readEntryCounters(
             Collection<PiTableEntry> tableEntries) {
         if (!driverBoolProperty(SUPPORT_TABLE_COUNTERS,
-                                DEFAULT_SUPPORT_TABLE_COUNTERS)) {
+                                DEFAULT_SUPPORT_TABLE_COUNTERS)
+                || tableEntries.isEmpty()) {
             return Collections.emptyMap();
         }
 
         Collection<PiCounterCellData> cellDatas;
-        try {
-            if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
-                                   DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
-                // FIXME: re-implement reading all counters ONOS-7595, or
-                // (even better) read counters when dumping table entries ONOS-7596
-                // cellDatas = client.readAllCounterCells(
-                //        singleton(counterId), pipeconf).get();
-                cellDatas = Collections.emptyList();
-            } else {
-                Set<PiCounterCellId> cellIds = tableEntries.stream()
-                        .filter(e -> tableHasCounter(e.table()))
-                        .map(PiCounterCellId::ofDirect)
-                        .collect(Collectors.toSet());
-                cellDatas = client.readCounterCells(cellIds, pipeconf).get();
-            }
-            return cellDatas.stream()
-                    .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
-        } catch (InterruptedException | ExecutionException e) {
-            if (!(e.getCause() instanceof StatusRuntimeException)) {
-                // gRPC errors are logged in the client.
-                log.error("Exception while reading table counters from {}: {}",
-                          deviceId, e);
-            }
-            return Collections.emptyMap();
+
+        if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
+                               DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
+            // FIXME: read counters when dumping table entries ONOS-7596
+            cellDatas = Collections.emptyList();
+        } else {
+            Set<PiCounterCellId> cellIds = tableEntries.stream()
+                    .filter(e -> tableHasCounter(e.table()))
+                    .map(PiCounterCellId::ofDirect)
+                    .collect(Collectors.toSet());
+            cellDatas = getFutureWithDeadline(client.readCounterCells(cellIds, pipeconf),
+                                              "reading table counters", Collections.emptyList());
         }
+        return cellDatas.stream()
+                .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
+
     }
 
     private boolean tableHasCounter(PiTableId tableId) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
index dc2ec69..02c0fa3 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
@@ -42,12 +42,12 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.lang.String.format;
 import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.DELETE;
 import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.INSERT;
 import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.MODIFY;
@@ -226,7 +226,7 @@
 
         if (!membersToRemove.isEmpty() &&
                 !completeFuture(client.writeActionGroupMembers(groupProfileId, membersToRemove, DELETE, pipeconf),
-                               ACT_GRP_MEMS_STR, DELETE_STR)) {
+                                ACT_GRP_MEMS_STR, DELETE_STR)) {
             // add what we removed
             completeFuture(client.writeActionGroupMembers(groupProfileId, membersToRemove, INSERT, pipeconf),
                            ACT_GRP_MEMS_STR, DELETE_STR);
@@ -271,31 +271,18 @@
 
     private boolean completeFuture(CompletableFuture<Boolean> completableFuture,
                                    String topic, String action) {
-        try {
-            if (completableFuture.get()) {
-                return true;
-            } else {
-                log.warn("Unable to {} {}", action, topic);
-                return false;
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            log.warn("Exception while performing {} {}: {}", action, topic, e.getMessage());
-            log.debug("Exception", e);
-            return false;
-        }
+        return getFutureWithDeadline(
+                completableFuture, format("performing %s %s", action, topic), false);
     }
 
     private Stream<Group> streamGroupsFromDevice(PiActionProfileId actProfId) {
-        try {
-            // Read PI groups and return original PD one.
-            return client.dumpGroups(actProfId, pipeconf).get().stream()
-                    .map(this::forgeGroupEntry)
-                    .filter(Objects::nonNull);
-        } catch (ExecutionException | InterruptedException e) {
-            log.error("Exception while dumping groups from action profile '{}' on {}: {}",
-                      actProfId.id(), deviceId, e);
-            return Stream.empty();
-        }
+        // Read PI groups and return original PD one.
+        Collection<PiActionGroup> groups = getFutureWithDeadline(
+                client.dumpGroups(actProfId, pipeconf),
+                "dumping groups", Collections.emptyList());
+        return groups.stream()
+                .map(this::forgeGroupEntry)
+                .filter(Objects::nonNull);
     }
 
     private Group forgeGroupEntry(PiActionGroup piGroup) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index 58cab8a..23fcf1f 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -18,7 +18,7 @@
 
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.device.DeviceHandshaker;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 
@@ -29,72 +29,60 @@
  */
 public class P4RuntimeHandshaker extends AbstractP4RuntimeHandlerBehaviour implements DeviceHandshaker {
 
-    // TODO: consider abstract class with empty connect method and  implementation into a protected one for reusability.
-
     @Override
     public CompletableFuture<Boolean> connect() {
-        return CompletableFuture.supplyAsync(this::doConnect);
-    }
-
-    private boolean doConnect() {
-        return super.createClient();
+        return CompletableFuture
+                .supplyAsync(super::createClient)
+                .thenCompose(client -> {
+                    if (client == null) {
+                        return CompletableFuture.completedFuture(false);
+                    }
+                    return client.start();
+                });
     }
 
     @Override
     public CompletableFuture<Boolean> disconnect() {
-        return CompletableFuture.supplyAsync(() -> {
-            P4RuntimeController controller = handler().get(P4RuntimeController.class);
-            DeviceId deviceId = handler().data().deviceId();
-            controller.removeClient(deviceId);
-            return true;
-        });
+        final P4RuntimeController controller = handler().get(P4RuntimeController.class);
+        final DeviceId deviceId = handler().data().deviceId();
+        if (!controller.hasClient(deviceId)) {
+            return CompletableFuture.completedFuture(true);
+        } else {
+            return controller.getClient(deviceId).shutdown()
+                    .thenApplyAsync(v -> {
+                        controller.removeClient(deviceId);
+                        return true;
+                    });
+        }
     }
 
     @Override
     public CompletableFuture<Boolean> isReachable() {
-        return CompletableFuture.supplyAsync(() -> {
-            P4RuntimeController controller = handler().get(P4RuntimeController.class);
-            DeviceId deviceId = handler().data().deviceId();
-            return controller.isReacheable(deviceId);
-        });
+        return CompletableFuture.supplyAsync(() -> handler()
+                .get(P4RuntimeController.class)
+                .isReacheable(handler().data().deviceId())
+        );
     }
 
     @Override
-    public CompletableFuture<MastershipRole> roleChanged(MastershipRole newRole) {
-        deviceId = handler().data().deviceId();
-        controller = handler().get(P4RuntimeController.class);
-        CompletableFuture<MastershipRole> result = new CompletableFuture<>();
-
-        client = controller.getClient(deviceId);
-        if (client == null || !controller.isReacheable(deviceId)) {
-            result.complete(MastershipRole.NONE);
-            return result;
-        }
-        if (newRole.equals(MastershipRole.MASTER)) {
-            client.sendMasterArbitrationUpdate().thenAcceptAsync(success -> {
-                if (!success) {
-                    log.warn("Device {} arbitration failed", deviceId);
-                    result.complete(MastershipRole.STANDBY);
-                } else {
-                    result.complete(MastershipRole.MASTER);
+    public void roleChanged(MastershipRole newRole) {
+        if (setupBehaviour() && newRole.equals(MastershipRole.MASTER)) {
+            client.becomeMaster().thenAcceptAsync(result -> {
+                if (!result) {
+                    log.error("Unable to notify mastership role {} to {}",
+                              newRole, deviceId);
                 }
             });
-        } else {
-            // Since we don't need to do anything, we can complete it directly
-            // Spec: The client with the highest election id is referred to as the
-            // "master", while all other clients are referred to as "slaves".
-            result.complete(newRole);
         }
-        return result;
     }
 
     @Override
-    public void addChannelListener(ChannelListener listener) {
-        controller.addChannelListener(deviceId, listener);
+    public void addDeviceAgentListener(DeviceAgentListener listener) {
+        controller.addDeviceAgentListener(deviceId, listener);
     }
 
     @Override
-    public void removeChannelListener(ChannelListener listener) {
-        controller.removeChannelListener(deviceId, listener);
+    public void removeDeviceAgentListener(DeviceAgentListener listener) {
+        controller.removeDeviceAgentListener(deviceId, listener);
     }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
index d7a204e..39ed84d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
@@ -16,9 +16,9 @@
 
 package org.onosproject.drivers.p4runtime;
 
-import com.google.common.cache.LoadingCache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMeterMirror;
 import org.onosproject.net.meter.Band;
 import org.onosproject.net.meter.DefaultBand;
@@ -37,11 +37,11 @@
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Set;
 import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
@@ -108,18 +108,13 @@
 
         final PiMeterHandle handle = PiMeterHandle.of(deviceId, piMeterCellConfig);
         ENTRY_LOCKS.getUnchecked(handle).lock();
-        boolean result = false;
-        try {
-            if (client.writeMeterCells(newArrayList(piMeterCellConfig), pipeconf).get()) {
-                meterMirror.put(handle, piMeterCellConfig);
-                result = true;
-            }
-
-        } catch (InterruptedException | ExecutionException e) {
-            log.warn("Exception while modify meter entry:", e);
-        } finally {
-            ENTRY_LOCKS.getUnchecked(handle).unlock();
+        final boolean result = getFutureWithDeadline(
+                client.writeMeterCells(newArrayList(piMeterCellConfig), pipeconf),
+                "writing meter cells", false);
+        if (result) {
+            meterMirror.put(handle, piMeterCellConfig);
         }
+        ENTRY_LOCKS.getUnchecked(handle).unlock();
 
         return result;
     }
@@ -162,4 +157,4 @@
 
         return CompletableFuture.completedFuture(meters);
     }
-}
\ No newline at end of file
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
index 6c61148..cc79cc1 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
@@ -26,7 +26,9 @@
 /**
  * Implementation of PacketProgrammable behaviour for P4Runtime.
  */
-public class P4RuntimePacketProgrammable extends AbstractP4RuntimeHandlerBehaviour implements PacketProgrammable {
+public class P4RuntimePacketProgrammable
+        extends AbstractP4RuntimeHandlerBehaviour
+        implements PacketProgrammable {
 
     @Override
     public void emit(OutboundPacket packet) {
@@ -38,7 +40,8 @@
         final PiPipelineInterpreter interpreter = device.is(PiPipelineInterpreter.class)
                 ? device.as(PiPipelineInterpreter.class) : null;
         if (interpreter == null) {
-            log.warn("Device {} with pipeconf {} has no interpreter, aborting emit operation", deviceId, pipeconf.id());
+            log.warn("Unable to get interpreter for {} with pipeconf {}, aborting emit operation",
+                     deviceId, pipeconf.id());
             return;
         }
 
@@ -46,11 +49,13 @@
             Collection<PiPacketOperation> operations = interpreter.mapOutboundPacket(packet);
             operations.forEach(piPacketOperation -> {
                 log.debug("Doing PiPacketOperation {}", piPacketOperation);
-                client.packetOut(piPacketOperation, pipeconf);
+                getFutureWithDeadline(
+                        client.packetOut(piPacketOperation, pipeconf),
+                        "sending packet-out", false);
             });
         } catch (PiPipelineInterpreter.PiInterpreterException e) {
-            log.error("Interpreter of pipeconf {} was unable to translate outbound packet: {}",
-                    pipeconf.id(), e.getMessage());
+            log.error("Unable to translate outbound packet for {} with pipeconf {}: {}",
+                      deviceId, pipeconf.id(), e.getMessage());
         }
     }
 }