Improve scalability of P4Runtime subsystem
The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.
Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)
Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups
Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index 00fe9380..2b941cb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -21,7 +21,6 @@
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import io.grpc.StatusRuntimeException;
import org.onlab.util.SharedExecutors;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
@@ -32,7 +31,6 @@
import org.onosproject.net.pi.model.PiPipelineInterpreter;
import org.onosproject.net.pi.model.PiPipelineModel;
import org.onosproject.net.pi.model.PiTableId;
-import org.onosproject.net.pi.model.PiTableModel;
import org.onosproject.net.pi.runtime.PiCounterCellData;
import org.onosproject.net.pi.runtime.PiCounterCellId;
import org.onosproject.net.pi.runtime.PiTableEntry;
@@ -48,7 +46,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -110,7 +108,6 @@
}
});
private PiPipelineModel pipelineModel;
- private PiPipelineInterpreter interpreter;
private P4RuntimeTableMirror tableMirror;
private PiFlowRuleTranslator translator;
@@ -125,7 +122,6 @@
log.warn("Unable to get interpreter of {}", deviceId);
return false;
}
- interpreter = device.as(PiPipelineInterpreter.class);
pipelineModel = pipeconf.pipelineModel();
tableMirror = handler().get(P4RuntimeTableMirror.class);
translator = piTranslationService.flowRuleTranslator();
@@ -146,49 +142,35 @@
final ImmutableList.Builder<FlowEntry> result = ImmutableList.builder();
final List<PiTableEntry> inconsistentEntries = Lists.newArrayList();
- for (PiTableModel tableModel : pipelineModel.tables()) {
+ // Read table entries.
+ final Collection<PiTableEntry> installedEntries;
+ // TODO: ONOS-7596 read counters with table entries
+ installedEntries = getFutureWithDeadline(client.dumpAllTables(pipeconf),
+ "dumping tables", Collections.emptyList());
- final PiTableId piTableId = tableModel.id();
+ if (installedEntries.isEmpty()) {
+ return Collections.emptyList();
+ }
- // Read table entries.
- final Collection<PiTableEntry> installedEntries;
- try {
- // TODO: optimize by dumping entries and counters in parallel
- // From ALL tables with the same request.
- installedEntries = client.dumpTable(piTableId, pipeconf).get();
- } catch (InterruptedException | ExecutionException e) {
- if (!(e.getCause() instanceof StatusRuntimeException)) {
- // gRPC errors are logged in the client.
- log.error("Exception while dumping table {} of {}",
- piTableId, deviceId, e);
- }
- continue; // next table
- }
+ // Read table direct counters (if any).
+ final Map<PiTableEntry, PiCounterCellData> counterCellMap =
+ readEntryCounters(installedEntries);
- if (installedEntries.size() == 0) {
- continue; // next table
- }
+ // Forge flow entries with counter values.
+ for (PiTableEntry installedEntry : installedEntries) {
- // Read table direct counters (if any).
- final Map<PiTableEntry, PiCounterCellData> counterCellMap =
- readEntryCounters(installedEntries);
+ final FlowEntry flowEntry = forgeFlowEntry(
+ installedEntry, counterCellMap.get(installedEntry));
- // Forge flow entries with counter values.
- for (PiTableEntry installedEntry : installedEntries) {
-
- final FlowEntry flowEntry = forgeFlowEntry(
- installedEntry, counterCellMap.get(installedEntry));
-
- if (flowEntry == null) {
- // Entry is on device but unknown to translation service or
- // device mirror. Inconsistent. Mark for removal.
- // TODO: make this behaviour configurable
- // In some cases it's fine for the device to have rules
- // that were not installed by us.
- inconsistentEntries.add(installedEntry);
- } else {
- result.add(flowEntry);
- }
+ if (flowEntry == null) {
+ // Entry is on device but unknown to translation service or
+ // device mirror. Inconsistent. Mark for removal.
+ // TODO: make this behaviour configurable
+ // In some cases it's fine for the device to have rules
+ // that were not installed by us.
+ inconsistentEntries.add(installedEntry);
+ } else {
+ result.add(flowEntry);
}
}
@@ -354,19 +336,19 @@
*/
private boolean writeEntry(PiTableEntry entry,
WriteOperationType p4Operation) {
- try {
- if (client.writeTableEntries(
- newArrayList(entry), p4Operation, pipeconf).get()) {
- return true;
- } else {
- log.warn("Unable to {} table entry in {}: {}",
- p4Operation.name(), deviceId, entry);
- }
- } catch (InterruptedException | ExecutionException e) {
- log.warn("Exception while performing {} table entry operation:",
- p4Operation, e);
+ final CompletableFuture<Boolean> future = client.writeTableEntries(
+ newArrayList(entry), p4Operation, pipeconf);
+ final Boolean success = getFutureWithDeadline(
+ future, "performing table " + p4Operation.name(), null);
+ if (success == null) {
+ // Error logged by getFutureWithDeadline();
+ return false;
}
- return false;
+ if (!success) {
+ log.warn("Unable to {} table entry in {}: {}",
+ p4Operation.name(), deviceId, entry);
+ }
+ return success;
}
private void updateStores(PiTableEntryHandle handle,
@@ -392,36 +374,28 @@
private Map<PiTableEntry, PiCounterCellData> readEntryCounters(
Collection<PiTableEntry> tableEntries) {
if (!driverBoolProperty(SUPPORT_TABLE_COUNTERS,
- DEFAULT_SUPPORT_TABLE_COUNTERS)) {
+ DEFAULT_SUPPORT_TABLE_COUNTERS)
+ || tableEntries.isEmpty()) {
return Collections.emptyMap();
}
Collection<PiCounterCellData> cellDatas;
- try {
- if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
- DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
- // FIXME: re-implement reading all counters ONOS-7595, or
- // (even better) read counters when dumping table entries ONOS-7596
- // cellDatas = client.readAllCounterCells(
- // singleton(counterId), pipeconf).get();
- cellDatas = Collections.emptyList();
- } else {
- Set<PiCounterCellId> cellIds = tableEntries.stream()
- .filter(e -> tableHasCounter(e.table()))
- .map(PiCounterCellId::ofDirect)
- .collect(Collectors.toSet());
- cellDatas = client.readCounterCells(cellIds, pipeconf).get();
- }
- return cellDatas.stream()
- .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
- } catch (InterruptedException | ExecutionException e) {
- if (!(e.getCause() instanceof StatusRuntimeException)) {
- // gRPC errors are logged in the client.
- log.error("Exception while reading table counters from {}: {}",
- deviceId, e);
- }
- return Collections.emptyMap();
+
+ if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
+ DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
+ // FIXME: read counters when dumping table entries ONOS-7596
+ cellDatas = Collections.emptyList();
+ } else {
+ Set<PiCounterCellId> cellIds = tableEntries.stream()
+ .filter(e -> tableHasCounter(e.table()))
+ .map(PiCounterCellId::ofDirect)
+ .collect(Collectors.toSet());
+ cellDatas = getFutureWithDeadline(client.readCounterCells(cellIds, pipeconf),
+ "reading table counters", Collections.emptyList());
}
+ return cellDatas.stream()
+ .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
+
}
private boolean tableHasCounter(PiTableId tableId) {