Improve scalability of P4Runtime subsystem
The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.
Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)
Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups
Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
index 8e2b54d..ac4856a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
@@ -16,11 +16,13 @@
package org.onosproject.drivers.p4runtime;
+import io.grpc.StatusRuntimeException;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.net.pi.service.PiTranslationService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
@@ -28,6 +30,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -35,6 +42,10 @@
*/
public class AbstractP4RuntimeHandlerBehaviour extends AbstractHandlerBehaviour {
+ // Timeout in seconds for device operations.
+ // TODO make configurable via driver properties
+ private static final int DEVICE_OP_TIMEOUT = 5;
+
public static final String P4RUNTIME_SERVER_ADDR_KEY = "p4runtime_ip";
public static final String P4RUNTIME_SERVER_PORT_KEY = "p4runtime_port";
public static final String P4RUNTIME_DEVICE_ID_KEY = "p4runtime_deviceId";
@@ -76,12 +87,18 @@
client = controller.getClient(deviceId);
PiPipeconfService piPipeconfService = handler().get(PiPipeconfService.class);
- if (!piPipeconfService.ofDevice(deviceId).isPresent() ||
- !piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).isPresent()) {
- log.warn("Unable to get the pipeconf of {}, aborting operation", deviceId);
+ if (!piPipeconfService.ofDevice(deviceId).isPresent()) {
+ log.warn("Unable to get assigned pipeconf for {} (mapping " +
+ "missing in PiPipeconfService), aborting operation",
+ deviceId);
return false;
}
- pipeconf = piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).get();
+ PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).get();
+ if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
+ log.warn("Cannot find any pipeconf with ID '{}' ({}), aborting operation", pipeconfId, deviceId);
+ return false;
+ }
+ pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
piTranslationService = handler().get(PiTranslationService.class);
@@ -89,12 +106,12 @@
}
/**
- * Create a P4Runtime client for this device. Returns true if the operation
- * was successful, false otherwise.
+ * Returns a P4Runtime client for this device, null if such client cannot
+ * be created.
*
- * @return true if successful, false otherwise
+ * @return client or null
*/
- protected boolean createClient() {
+ P4RuntimeClient createClient() {
deviceId = handler().data().deviceId();
controller = handler().get(P4RuntimeController.class);
@@ -105,24 +122,38 @@
if (serverAddr == null || serverPortString == null || p4DeviceIdString == null) {
log.warn("Unable to create client for {}, missing driver data key (required is {}, {}, and {})",
deviceId, P4RUNTIME_SERVER_ADDR_KEY, P4RUNTIME_SERVER_PORT_KEY, P4RUNTIME_DEVICE_ID_KEY);
- return false;
+ return null;
}
- if (!controller.createClient(deviceId, serverAddr,
- Integer.parseUnsignedInt(serverPortString),
- Long.parseUnsignedLong(p4DeviceIdString))) {
+ final int serverPort;
+ final long p4DeviceId;
+
+ try {
+ serverPort = Integer.parseUnsignedInt(serverPortString);
+ } catch (NumberFormatException e) {
+ log.error("{} is not a valid P4Runtime port number", serverPortString);
+ return null;
+ }
+ try {
+ p4DeviceId = Long.parseUnsignedLong(p4DeviceIdString);
+ } catch (NumberFormatException e) {
+ log.error("{} is not a valid P4Runtime-internal device ID", p4DeviceIdString);
+ return null;
+ }
+
+ if (!controller.createClient(deviceId, serverAddr, serverPort, p4DeviceId)) {
log.warn("Unable to create client for {}, aborting operation", deviceId);
- return false;
+ return null;
}
- return true;
+ return controller.getClient(deviceId);
}
/**
- * Returns the value of the given driver property, if present,
- * otherwise returns the given default value.
+ * Returns the value of the given driver property, if present, otherwise
+ * returns the given default value.
*
- * @param propName property name
+ * @param propName property name
* @param defaultVal default value
* @return boolean
*/
@@ -134,4 +165,36 @@
return Boolean.parseBoolean(handler().driver().getProperty(propName));
}
}
+
+ /**
+ * Convenience method to get the result of a completable future while
+ * setting a timeout and checking for exceptions.
+ *
+ * @param future completable future
+ * @param opDescription operation description to use in log messages. Should
+ * be a sentence starting with a verb ending in -ing,
+ * e.g. "reading...", "writing...", etc.
+ * @param defaultValue value to return if operation fails
+ * @param <U> type of returned value
+ * @return future result or default value
+ */
+ <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
+ U defaultValue) {
+ try {
+ return future.get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.error("Exception while {} on {}", opDescription, deviceId);
+ } catch (ExecutionException e) {
+ final Throwable cause = e.getCause();
+ if (cause instanceof StatusRuntimeException) {
+ final StatusRuntimeException grpcError = (StatusRuntimeException) cause;
+ log.warn("Error while {} on {}: {}", opDescription, deviceId, grpcError.getMessage());
+ } else {
+ log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
+ }
+ } catch (TimeoutException e) {
+ log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
+ }
+ return defaultValue;
+ }
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
index 6a6c2e9..cfe717d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
@@ -18,9 +18,8 @@
import org.onlab.util.SharedExecutors;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.behaviour.PiPipelineProgrammable;
+import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.slf4j.Logger;
@@ -28,14 +27,14 @@
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Abstract implementation of the PiPipelineProgrammable behaviours for a P4Runtime device.
*/
-public abstract class AbstractP4RuntimePipelineProgrammable extends AbstractHandlerBehaviour
+public abstract class AbstractP4RuntimePipelineProgrammable
+ extends AbstractP4RuntimeHandlerBehaviour
implements PiPipelineProgrammable {
protected final Logger log = getLogger(getClass());
@@ -72,25 +71,31 @@
return false;
}
- try {
- if (!client.initStreamChannel().get()) {
- log.warn("Unable to init stream channel to {}.", deviceId);
- return false;
- }
- } catch (InterruptedException | ExecutionException e) {
- log.error("Exception while initializing stream channel on {}", deviceId, e);
+ // We need to be master to perform write RPC to the device.
+ // FIXME: properly perform mastership handling in the device provider
+ // This would probably mean deploying the pipeline after the device as
+ // been notified to the core.
+ final Boolean masterSuccess = getFutureWithDeadline(
+ client.becomeMaster(),
+ "becoming master ", null);
+ if (masterSuccess == null) {
+ // Error already logged by getFutureWithDeadline()
+ return false;
+ } else if (!masterSuccess) {
+ log.warn("Unable to become master for {}, aborting pipeconf deploy", deviceId);
return false;
}
- try {
- if (!client.setPipelineConfig(pipeconf, deviceDataBuffer).get()) {
- log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
- return false;
- }
- } catch (InterruptedException | ExecutionException e) {
- log.error("Exception while deploying pipeconf to {}", deviceId, e);
+ final Boolean deploySuccess = getFutureWithDeadline(
+ client.setPipelineConfig(pipeconf, deviceDataBuffer),
+ "deploying pipeconf", null);
+ if (deploySuccess == null) {
+ return false;
+ } else if (!deploySuccess) {
+ log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
return false;
}
+
return true;
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index 00fe9380..2b941cb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -21,7 +21,6 @@
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import io.grpc.StatusRuntimeException;
import org.onlab.util.SharedExecutors;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
@@ -32,7 +31,6 @@
import org.onosproject.net.pi.model.PiPipelineInterpreter;
import org.onosproject.net.pi.model.PiPipelineModel;
import org.onosproject.net.pi.model.PiTableId;
-import org.onosproject.net.pi.model.PiTableModel;
import org.onosproject.net.pi.runtime.PiCounterCellData;
import org.onosproject.net.pi.runtime.PiCounterCellId;
import org.onosproject.net.pi.runtime.PiTableEntry;
@@ -48,7 +46,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -110,7 +108,6 @@
}
});
private PiPipelineModel pipelineModel;
- private PiPipelineInterpreter interpreter;
private P4RuntimeTableMirror tableMirror;
private PiFlowRuleTranslator translator;
@@ -125,7 +122,6 @@
log.warn("Unable to get interpreter of {}", deviceId);
return false;
}
- interpreter = device.as(PiPipelineInterpreter.class);
pipelineModel = pipeconf.pipelineModel();
tableMirror = handler().get(P4RuntimeTableMirror.class);
translator = piTranslationService.flowRuleTranslator();
@@ -146,49 +142,35 @@
final ImmutableList.Builder<FlowEntry> result = ImmutableList.builder();
final List<PiTableEntry> inconsistentEntries = Lists.newArrayList();
- for (PiTableModel tableModel : pipelineModel.tables()) {
+ // Read table entries.
+ final Collection<PiTableEntry> installedEntries;
+ // TODO: ONOS-7596 read counters with table entries
+ installedEntries = getFutureWithDeadline(client.dumpAllTables(pipeconf),
+ "dumping tables", Collections.emptyList());
- final PiTableId piTableId = tableModel.id();
+ if (installedEntries.isEmpty()) {
+ return Collections.emptyList();
+ }
- // Read table entries.
- final Collection<PiTableEntry> installedEntries;
- try {
- // TODO: optimize by dumping entries and counters in parallel
- // From ALL tables with the same request.
- installedEntries = client.dumpTable(piTableId, pipeconf).get();
- } catch (InterruptedException | ExecutionException e) {
- if (!(e.getCause() instanceof StatusRuntimeException)) {
- // gRPC errors are logged in the client.
- log.error("Exception while dumping table {} of {}",
- piTableId, deviceId, e);
- }
- continue; // next table
- }
+ // Read table direct counters (if any).
+ final Map<PiTableEntry, PiCounterCellData> counterCellMap =
+ readEntryCounters(installedEntries);
- if (installedEntries.size() == 0) {
- continue; // next table
- }
+ // Forge flow entries with counter values.
+ for (PiTableEntry installedEntry : installedEntries) {
- // Read table direct counters (if any).
- final Map<PiTableEntry, PiCounterCellData> counterCellMap =
- readEntryCounters(installedEntries);
+ final FlowEntry flowEntry = forgeFlowEntry(
+ installedEntry, counterCellMap.get(installedEntry));
- // Forge flow entries with counter values.
- for (PiTableEntry installedEntry : installedEntries) {
-
- final FlowEntry flowEntry = forgeFlowEntry(
- installedEntry, counterCellMap.get(installedEntry));
-
- if (flowEntry == null) {
- // Entry is on device but unknown to translation service or
- // device mirror. Inconsistent. Mark for removal.
- // TODO: make this behaviour configurable
- // In some cases it's fine for the device to have rules
- // that were not installed by us.
- inconsistentEntries.add(installedEntry);
- } else {
- result.add(flowEntry);
- }
+ if (flowEntry == null) {
+ // Entry is on device but unknown to translation service or
+ // device mirror. Inconsistent. Mark for removal.
+ // TODO: make this behaviour configurable
+ // In some cases it's fine for the device to have rules
+ // that were not installed by us.
+ inconsistentEntries.add(installedEntry);
+ } else {
+ result.add(flowEntry);
}
}
@@ -354,19 +336,19 @@
*/
private boolean writeEntry(PiTableEntry entry,
WriteOperationType p4Operation) {
- try {
- if (client.writeTableEntries(
- newArrayList(entry), p4Operation, pipeconf).get()) {
- return true;
- } else {
- log.warn("Unable to {} table entry in {}: {}",
- p4Operation.name(), deviceId, entry);
- }
- } catch (InterruptedException | ExecutionException e) {
- log.warn("Exception while performing {} table entry operation:",
- p4Operation, e);
+ final CompletableFuture<Boolean> future = client.writeTableEntries(
+ newArrayList(entry), p4Operation, pipeconf);
+ final Boolean success = getFutureWithDeadline(
+ future, "performing table " + p4Operation.name(), null);
+ if (success == null) {
+ // Error logged by getFutureWithDeadline();
+ return false;
}
- return false;
+ if (!success) {
+ log.warn("Unable to {} table entry in {}: {}",
+ p4Operation.name(), deviceId, entry);
+ }
+ return success;
}
private void updateStores(PiTableEntryHandle handle,
@@ -392,36 +374,28 @@
private Map<PiTableEntry, PiCounterCellData> readEntryCounters(
Collection<PiTableEntry> tableEntries) {
if (!driverBoolProperty(SUPPORT_TABLE_COUNTERS,
- DEFAULT_SUPPORT_TABLE_COUNTERS)) {
+ DEFAULT_SUPPORT_TABLE_COUNTERS)
+ || tableEntries.isEmpty()) {
return Collections.emptyMap();
}
Collection<PiCounterCellData> cellDatas;
- try {
- if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
- DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
- // FIXME: re-implement reading all counters ONOS-7595, or
- // (even better) read counters when dumping table entries ONOS-7596
- // cellDatas = client.readAllCounterCells(
- // singleton(counterId), pipeconf).get();
- cellDatas = Collections.emptyList();
- } else {
- Set<PiCounterCellId> cellIds = tableEntries.stream()
- .filter(e -> tableHasCounter(e.table()))
- .map(PiCounterCellId::ofDirect)
- .collect(Collectors.toSet());
- cellDatas = client.readCounterCells(cellIds, pipeconf).get();
- }
- return cellDatas.stream()
- .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
- } catch (InterruptedException | ExecutionException e) {
- if (!(e.getCause() instanceof StatusRuntimeException)) {
- // gRPC errors are logged in the client.
- log.error("Exception while reading table counters from {}: {}",
- deviceId, e);
- }
- return Collections.emptyMap();
+
+ if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
+ DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
+ // FIXME: read counters when dumping table entries ONOS-7596
+ cellDatas = Collections.emptyList();
+ } else {
+ Set<PiCounterCellId> cellIds = tableEntries.stream()
+ .filter(e -> tableHasCounter(e.table()))
+ .map(PiCounterCellId::ofDirect)
+ .collect(Collectors.toSet());
+ cellDatas = getFutureWithDeadline(client.readCounterCells(cellIds, pipeconf),
+ "reading table counters", Collections.emptyList());
}
+ return cellDatas.stream()
+ .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
+
}
private boolean tableHasCounter(PiTableId tableId) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
index dc2ec69..02c0fa3 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
@@ -42,12 +42,12 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static java.lang.String.format;
import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.DELETE;
import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.INSERT;
import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.MODIFY;
@@ -226,7 +226,7 @@
if (!membersToRemove.isEmpty() &&
!completeFuture(client.writeActionGroupMembers(groupProfileId, membersToRemove, DELETE, pipeconf),
- ACT_GRP_MEMS_STR, DELETE_STR)) {
+ ACT_GRP_MEMS_STR, DELETE_STR)) {
// add what we removed
completeFuture(client.writeActionGroupMembers(groupProfileId, membersToRemove, INSERT, pipeconf),
ACT_GRP_MEMS_STR, DELETE_STR);
@@ -271,31 +271,18 @@
private boolean completeFuture(CompletableFuture<Boolean> completableFuture,
String topic, String action) {
- try {
- if (completableFuture.get()) {
- return true;
- } else {
- log.warn("Unable to {} {}", action, topic);
- return false;
- }
- } catch (InterruptedException | ExecutionException e) {
- log.warn("Exception while performing {} {}: {}", action, topic, e.getMessage());
- log.debug("Exception", e);
- return false;
- }
+ return getFutureWithDeadline(
+ completableFuture, format("performing %s %s", action, topic), false);
}
private Stream<Group> streamGroupsFromDevice(PiActionProfileId actProfId) {
- try {
- // Read PI groups and return original PD one.
- return client.dumpGroups(actProfId, pipeconf).get().stream()
- .map(this::forgeGroupEntry)
- .filter(Objects::nonNull);
- } catch (ExecutionException | InterruptedException e) {
- log.error("Exception while dumping groups from action profile '{}' on {}: {}",
- actProfId.id(), deviceId, e);
- return Stream.empty();
- }
+ // Read PI groups and return original PD one.
+ Collection<PiActionGroup> groups = getFutureWithDeadline(
+ client.dumpGroups(actProfId, pipeconf),
+ "dumping groups", Collections.emptyList());
+ return groups.stream()
+ .map(this::forgeGroupEntry)
+ .filter(Objects::nonNull);
}
private Group forgeGroupEntry(PiActionGroup piGroup) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index 58cab8a..23fcf1f 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -18,7 +18,7 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.device.DeviceHandshaker;
import org.onosproject.p4runtime.api.P4RuntimeController;
@@ -29,72 +29,60 @@
*/
public class P4RuntimeHandshaker extends AbstractP4RuntimeHandlerBehaviour implements DeviceHandshaker {
- // TODO: consider abstract class with empty connect method and implementation into a protected one for reusability.
-
@Override
public CompletableFuture<Boolean> connect() {
- return CompletableFuture.supplyAsync(this::doConnect);
- }
-
- private boolean doConnect() {
- return super.createClient();
+ return CompletableFuture
+ .supplyAsync(super::createClient)
+ .thenCompose(client -> {
+ if (client == null) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return client.start();
+ });
}
@Override
public CompletableFuture<Boolean> disconnect() {
- return CompletableFuture.supplyAsync(() -> {
- P4RuntimeController controller = handler().get(P4RuntimeController.class);
- DeviceId deviceId = handler().data().deviceId();
- controller.removeClient(deviceId);
- return true;
- });
+ final P4RuntimeController controller = handler().get(P4RuntimeController.class);
+ final DeviceId deviceId = handler().data().deviceId();
+ if (!controller.hasClient(deviceId)) {
+ return CompletableFuture.completedFuture(true);
+ } else {
+ return controller.getClient(deviceId).shutdown()
+ .thenApplyAsync(v -> {
+ controller.removeClient(deviceId);
+ return true;
+ });
+ }
}
@Override
public CompletableFuture<Boolean> isReachable() {
- return CompletableFuture.supplyAsync(() -> {
- P4RuntimeController controller = handler().get(P4RuntimeController.class);
- DeviceId deviceId = handler().data().deviceId();
- return controller.isReacheable(deviceId);
- });
+ return CompletableFuture.supplyAsync(() -> handler()
+ .get(P4RuntimeController.class)
+ .isReacheable(handler().data().deviceId())
+ );
}
@Override
- public CompletableFuture<MastershipRole> roleChanged(MastershipRole newRole) {
- deviceId = handler().data().deviceId();
- controller = handler().get(P4RuntimeController.class);
- CompletableFuture<MastershipRole> result = new CompletableFuture<>();
-
- client = controller.getClient(deviceId);
- if (client == null || !controller.isReacheable(deviceId)) {
- result.complete(MastershipRole.NONE);
- return result;
- }
- if (newRole.equals(MastershipRole.MASTER)) {
- client.sendMasterArbitrationUpdate().thenAcceptAsync(success -> {
- if (!success) {
- log.warn("Device {} arbitration failed", deviceId);
- result.complete(MastershipRole.STANDBY);
- } else {
- result.complete(MastershipRole.MASTER);
+ public void roleChanged(MastershipRole newRole) {
+ if (setupBehaviour() && newRole.equals(MastershipRole.MASTER)) {
+ client.becomeMaster().thenAcceptAsync(result -> {
+ if (!result) {
+ log.error("Unable to notify mastership role {} to {}",
+ newRole, deviceId);
}
});
- } else {
- // Since we don't need to do anything, we can complete it directly
- // Spec: The client with the highest election id is referred to as the
- // "master", while all other clients are referred to as "slaves".
- result.complete(newRole);
}
- return result;
}
@Override
- public void addChannelListener(ChannelListener listener) {
- controller.addChannelListener(deviceId, listener);
+ public void addDeviceAgentListener(DeviceAgentListener listener) {
+ controller.addDeviceAgentListener(deviceId, listener);
}
@Override
- public void removeChannelListener(ChannelListener listener) {
- controller.removeChannelListener(deviceId, listener);
+ public void removeDeviceAgentListener(DeviceAgentListener listener) {
+ controller.removeDeviceAgentListener(deviceId, listener);
}
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
index d7a204e..39ed84d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
@@ -16,9 +16,9 @@
package org.onosproject.drivers.p4runtime;
-import com.google.common.cache.LoadingCache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMeterMirror;
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.DefaultBand;
@@ -37,11 +37,11 @@
import java.util.Collection;
import java.util.Collections;
-import java.util.Set;
import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -108,18 +108,13 @@
final PiMeterHandle handle = PiMeterHandle.of(deviceId, piMeterCellConfig);
ENTRY_LOCKS.getUnchecked(handle).lock();
- boolean result = false;
- try {
- if (client.writeMeterCells(newArrayList(piMeterCellConfig), pipeconf).get()) {
- meterMirror.put(handle, piMeterCellConfig);
- result = true;
- }
-
- } catch (InterruptedException | ExecutionException e) {
- log.warn("Exception while modify meter entry:", e);
- } finally {
- ENTRY_LOCKS.getUnchecked(handle).unlock();
+ final boolean result = getFutureWithDeadline(
+ client.writeMeterCells(newArrayList(piMeterCellConfig), pipeconf),
+ "writing meter cells", false);
+ if (result) {
+ meterMirror.put(handle, piMeterCellConfig);
}
+ ENTRY_LOCKS.getUnchecked(handle).unlock();
return result;
}
@@ -162,4 +157,4 @@
return CompletableFuture.completedFuture(meters);
}
-}
\ No newline at end of file
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
index 6c61148..cc79cc1 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
@@ -26,7 +26,9 @@
/**
* Implementation of PacketProgrammable behaviour for P4Runtime.
*/
-public class P4RuntimePacketProgrammable extends AbstractP4RuntimeHandlerBehaviour implements PacketProgrammable {
+public class P4RuntimePacketProgrammable
+ extends AbstractP4RuntimeHandlerBehaviour
+ implements PacketProgrammable {
@Override
public void emit(OutboundPacket packet) {
@@ -38,7 +40,8 @@
final PiPipelineInterpreter interpreter = device.is(PiPipelineInterpreter.class)
? device.as(PiPipelineInterpreter.class) : null;
if (interpreter == null) {
- log.warn("Device {} with pipeconf {} has no interpreter, aborting emit operation", deviceId, pipeconf.id());
+ log.warn("Unable to get interpreter for {} with pipeconf {}, aborting emit operation",
+ deviceId, pipeconf.id());
return;
}
@@ -46,11 +49,13 @@
Collection<PiPacketOperation> operations = interpreter.mapOutboundPacket(packet);
operations.forEach(piPacketOperation -> {
log.debug("Doing PiPacketOperation {}", piPacketOperation);
- client.packetOut(piPacketOperation, pipeconf);
+ getFutureWithDeadline(
+ client.packetOut(piPacketOperation, pipeconf),
+ "sending packet-out", false);
});
} catch (PiPipelineInterpreter.PiInterpreterException e) {
- log.error("Interpreter of pipeconf {} was unable to translate outbound packet: {}",
- pipeconf.id(), e.getMessage());
+ log.error("Unable to translate outbound packet for {} with pipeconf {}: {}",
+ deviceId, pipeconf.id(), e.getMessage());
}
}
}