Improve scalability of P4Runtime subsystem

The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.

Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)

Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups

Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index 00fe9380..2b941cb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -21,7 +21,6 @@
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import io.grpc.StatusRuntimeException;
 import org.onlab.util.SharedExecutors;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
 import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
@@ -32,7 +31,6 @@
 import org.onosproject.net.pi.model.PiPipelineInterpreter;
 import org.onosproject.net.pi.model.PiPipelineModel;
 import org.onosproject.net.pi.model.PiTableId;
-import org.onosproject.net.pi.model.PiTableModel;
 import org.onosproject.net.pi.runtime.PiCounterCellData;
 import org.onosproject.net.pi.runtime.PiCounterCellId;
 import org.onosproject.net.pi.runtime.PiTableEntry;
@@ -48,7 +46,7 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -110,7 +108,6 @@
                 }
             });
     private PiPipelineModel pipelineModel;
-    private PiPipelineInterpreter interpreter;
     private P4RuntimeTableMirror tableMirror;
     private PiFlowRuleTranslator translator;
 
@@ -125,7 +122,6 @@
             log.warn("Unable to get interpreter of {}", deviceId);
             return false;
         }
-        interpreter = device.as(PiPipelineInterpreter.class);
         pipelineModel = pipeconf.pipelineModel();
         tableMirror = handler().get(P4RuntimeTableMirror.class);
         translator = piTranslationService.flowRuleTranslator();
@@ -146,49 +142,35 @@
         final ImmutableList.Builder<FlowEntry> result = ImmutableList.builder();
         final List<PiTableEntry> inconsistentEntries = Lists.newArrayList();
 
-        for (PiTableModel tableModel : pipelineModel.tables()) {
+        // Read table entries.
+        final Collection<PiTableEntry> installedEntries;
+        // TODO: ONOS-7596 read counters with table entries
+        installedEntries = getFutureWithDeadline(client.dumpAllTables(pipeconf),
+                                                 "dumping tables", Collections.emptyList());
 
-            final PiTableId piTableId = tableModel.id();
+        if (installedEntries.isEmpty()) {
+            return Collections.emptyList();
+        }
 
-            // Read table entries.
-            final Collection<PiTableEntry> installedEntries;
-            try {
-                // TODO: optimize by dumping entries and counters in parallel
-                // From ALL tables with the same request.
-                installedEntries = client.dumpTable(piTableId, pipeconf).get();
-            } catch (InterruptedException | ExecutionException e) {
-                if (!(e.getCause() instanceof StatusRuntimeException)) {
-                    // gRPC errors are logged in the client.
-                    log.error("Exception while dumping table {} of {}",
-                              piTableId, deviceId, e);
-                }
-                continue; // next table
-            }
+        // Read table direct counters (if any).
+        final Map<PiTableEntry, PiCounterCellData> counterCellMap =
+                readEntryCounters(installedEntries);
 
-            if (installedEntries.size() == 0) {
-                continue; // next table
-            }
+        // Forge flow entries with counter values.
+        for (PiTableEntry installedEntry : installedEntries) {
 
-            // Read table direct counters (if any).
-            final Map<PiTableEntry, PiCounterCellData> counterCellMap =
-                    readEntryCounters(installedEntries);
+            final FlowEntry flowEntry = forgeFlowEntry(
+                    installedEntry, counterCellMap.get(installedEntry));
 
-            // Forge flow entries with counter values.
-            for (PiTableEntry installedEntry : installedEntries) {
-
-                final FlowEntry flowEntry = forgeFlowEntry(
-                        installedEntry, counterCellMap.get(installedEntry));
-
-                if (flowEntry == null) {
-                    // Entry is on device but unknown to translation service or
-                    // device mirror. Inconsistent. Mark for removal.
-                    // TODO: make this behaviour configurable
-                    // In some cases it's fine for the device to have rules
-                    // that were not installed by us.
-                    inconsistentEntries.add(installedEntry);
-                } else {
-                    result.add(flowEntry);
-                }
+            if (flowEntry == null) {
+                // Entry is on device but unknown to translation service or
+                // device mirror. Inconsistent. Mark for removal.
+                // TODO: make this behaviour configurable
+                // In some cases it's fine for the device to have rules
+                // that were not installed by us.
+                inconsistentEntries.add(installedEntry);
+            } else {
+                result.add(flowEntry);
             }
         }
 
@@ -354,19 +336,19 @@
      */
     private boolean writeEntry(PiTableEntry entry,
                                WriteOperationType p4Operation) {
-        try {
-            if (client.writeTableEntries(
-                    newArrayList(entry), p4Operation, pipeconf).get()) {
-                return true;
-            } else {
-                log.warn("Unable to {} table entry in {}: {}",
-                         p4Operation.name(), deviceId, entry);
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            log.warn("Exception while performing {} table entry operation:",
-                     p4Operation, e);
+        final CompletableFuture<Boolean> future = client.writeTableEntries(
+                newArrayList(entry), p4Operation, pipeconf);
+        final Boolean success = getFutureWithDeadline(
+                future, "performing table " + p4Operation.name(), null);
+        if (success == null) {
+            // Error logged by getFutureWithDeadline();
+            return false;
         }
-        return false;
+        if (!success) {
+            log.warn("Unable to {} table entry in {}: {}",
+                     p4Operation.name(), deviceId, entry);
+        }
+        return success;
     }
 
     private void updateStores(PiTableEntryHandle handle,
@@ -392,36 +374,28 @@
     private Map<PiTableEntry, PiCounterCellData> readEntryCounters(
             Collection<PiTableEntry> tableEntries) {
         if (!driverBoolProperty(SUPPORT_TABLE_COUNTERS,
-                                DEFAULT_SUPPORT_TABLE_COUNTERS)) {
+                                DEFAULT_SUPPORT_TABLE_COUNTERS)
+                || tableEntries.isEmpty()) {
             return Collections.emptyMap();
         }
 
         Collection<PiCounterCellData> cellDatas;
-        try {
-            if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
-                                   DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
-                // FIXME: re-implement reading all counters ONOS-7595, or
-                // (even better) read counters when dumping table entries ONOS-7596
-                // cellDatas = client.readAllCounterCells(
-                //        singleton(counterId), pipeconf).get();
-                cellDatas = Collections.emptyList();
-            } else {
-                Set<PiCounterCellId> cellIds = tableEntries.stream()
-                        .filter(e -> tableHasCounter(e.table()))
-                        .map(PiCounterCellId::ofDirect)
-                        .collect(Collectors.toSet());
-                cellDatas = client.readCounterCells(cellIds, pipeconf).get();
-            }
-            return cellDatas.stream()
-                    .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
-        } catch (InterruptedException | ExecutionException e) {
-            if (!(e.getCause() instanceof StatusRuntimeException)) {
-                // gRPC errors are logged in the client.
-                log.error("Exception while reading table counters from {}: {}",
-                          deviceId, e);
-            }
-            return Collections.emptyMap();
+
+        if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
+                               DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
+            // FIXME: read counters when dumping table entries ONOS-7596
+            cellDatas = Collections.emptyList();
+        } else {
+            Set<PiCounterCellId> cellIds = tableEntries.stream()
+                    .filter(e -> tableHasCounter(e.table()))
+                    .map(PiCounterCellId::ofDirect)
+                    .collect(Collectors.toSet());
+            cellDatas = getFutureWithDeadline(client.readCounterCells(cellIds, pipeconf),
+                                              "reading table counters", Collections.emptyList());
         }
+        return cellDatas.stream()
+                .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
+
     }
 
     private boolean tableHasCounter(PiTableId tableId) {