Improve scalability of P4Runtime subsystem
The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.
Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)
Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups
Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/core/api/src/main/java/org/onosproject/net/device/ChannelEvent.java b/core/api/src/main/java/org/onosproject/net/device/ChannelEvent.java
deleted file mode 100644
index 98ea5cb..0000000
--- a/core/api/src/main/java/org/onosproject/net/device/ChannelEvent.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.net.device;
-
-import org.onlab.util.Tools;
-import org.onosproject.event.AbstractEvent;
-import org.onosproject.net.DeviceId;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-/**
- * Describes event related to a channel established by ONOS with a device.
- */
-public class ChannelEvent extends AbstractEvent<ChannelEvent.Type, DeviceId> {
-
- private final Throwable throwable;
-
- /**
- * Type of device events.
- */
- public enum Type {
- /**
- * Signifies that the channel has properly connected.
- */
- CHANNEL_CONNECTED,
-
- /**
- * Signifies that the channel has disconnected.
- */
- CHANNEL_DISCONNECTED,
-
- /**
- * Signifies that an error happened on the channel with the given device.
- */
- CHANNEL_ERROR
-
- }
-
- /**
- * Creates an event of a given type and for the specified device.
- *
- * @param type device event type
- * @param deviceId event device subject
- */
- public ChannelEvent(Type type, DeviceId deviceId) {
- this(type, deviceId, null);
- }
-
- /**
- * Creates an event of a given type and for the specified device, given a certain throwable.
- *
- * @param type device event type
- * @param deviceId event device subject
- * @param throwable exception happened on the channel
- */
- public ChannelEvent(Type type, DeviceId deviceId, Throwable throwable) {
- super(type, deviceId);
- this.throwable = throwable;
- }
-
- /**
- * Creates an event of a given type and for the specified device and the current time.
- *
- * @param type device event type
- * @param deviceId event device subject
- * @param throwable exception happened on the channel
- * @param time occurrence time
- */
- public ChannelEvent(Type type, DeviceId deviceId, Throwable throwable, long time) {
- super(type, deviceId, time);
- this.throwable = throwable;
- }
-
- /**
- * Returns the exception that happened on the channel.
- *
- * @return a throwable if associated to the event, otherwise null.
- */
- public Throwable throwable() {
- return throwable;
- }
-
- @Override
- public String toString() {
- if (throwable == null) {
- return super.toString();
- }
- return toStringHelper(this)
- .add("time", Tools.defaultOffsetDataTime(time()))
- .add("type", type())
- .add("subject", subject())
- .add("throwable", throwable)
- .toString();
- }
-}
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceAgentEvent.java b/core/api/src/main/java/org/onosproject/net/device/DeviceAgentEvent.java
new file mode 100644
index 0000000..22eab99
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceAgentEvent.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.device;
+
+import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Describes and event related to a protocol agent used to interact with an
+ * infrastructure device.
+ */
+public final class DeviceAgentEvent
+ extends AbstractEvent<DeviceAgentEvent.Type, DeviceId> {
+
+ /**
+ * Type of device events.
+ */
+ public enum Type {
+ /**
+ * Signifies that a channel between the agent and the device is open and
+ * the two can communicate.
+ */
+ CHANNEL_OPEN,
+
+ /**
+ * Signifies that a channel between the agent and the device is closed
+ * and the two cannot communicate.
+ */
+ CHANNEL_CLOSED,
+
+ /**
+ * Signifies that a channel error has been detected. Further
+ * investigation should be performed to check if the channel is still
+ * open or closed.
+ */
+ CHANNEL_ERROR,
+
+ /**
+ * Signifies that the agent has acquired master role.
+ */
+ ROLE_MASTER,
+
+ /**
+ * Signifies that the agent has standby/slave mastership role.
+ */
+ ROLE_STANDBY,
+
+ /**
+ * Signifies that the agent cannot acquire any valid mastership role for
+ * the device.
+ */
+ ROLE_NONE,
+
+ }
+
+ /**
+ * Creates a new device agent event for the given type and device ID.
+ *
+ * @param type event type
+ * @param deviceId device ID
+ */
+ public DeviceAgentEvent(Type type, DeviceId deviceId) {
+ super(type, deviceId);
+ }
+
+ /**
+ * Creates a new device agent event for the given type, device ID and time.
+ *
+ * @param type event type
+ * @param deviceId device ID
+ * @param time occurrence time
+ */
+ public DeviceAgentEvent(Type type, DeviceId deviceId, long time) {
+ super(type, deviceId, time);
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/device/ChannelListener.java b/core/api/src/main/java/org/onosproject/net/device/DeviceAgentListener.java
similarity index 79%
rename from core/api/src/main/java/org/onosproject/net/device/ChannelListener.java
rename to core/api/src/main/java/org/onosproject/net/device/DeviceAgentListener.java
index cdc90bf70..c034dc2 100644
--- a/core/api/src/main/java/org/onosproject/net/device/ChannelListener.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceAgentListener.java
@@ -18,7 +18,8 @@
import org.onosproject.event.EventListener;
/**
- * A listener to receive events related to a transport channel established with a device.
+ * A listener to receive events related to a protocol agent controlling an
+ * infrastructure device.
*/
-public interface ChannelListener extends EventListener<ChannelEvent> {
+public interface DeviceAgentListener extends EventListener<DeviceAgentEvent> {
}
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
index 606248a..bc870d3 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
@@ -30,38 +30,41 @@
public interface DeviceHandshaker extends DeviceConnect {
/**
- * Checks the reachability (connectivity) of a device.
- * Reachability, unlike availability, denotes whether THIS particular node
- * can send messages and receive replies from the specified device.
+ * Checks the reachability (connectivity) of a device. Reachability, unlike
+ * availability, denotes whether THIS particular node can send messages and
+ * receive replies from the specified device.
*
* @return CompletableFuture eventually true if reachable, false otherwise
*/
CompletableFuture<Boolean> isReachable();
/**
- * Applies on the device a mastership role change as decided by the core.
+ * Notifies the device a mastership role change as decided by the core. The
+ * implementation of this method should trigger a {@link DeviceAgentEvent}
+ * signaling the mastership role accepted by the device.
*
- * @param newRole newly determined mastership role
- * @return CompletableFuture with the mastership role accepted from the device
+ * @param newRole new mastership role
*/
- CompletableFuture<MastershipRole> roleChanged(MastershipRole newRole);
+ void roleChanged(MastershipRole newRole);
/**
- * Applies a listener to a channel established with the device.
+ * Adds a device agent listener.
*
- * @param listener the channel listener
+ * @param listener device agent listener
*/
- default void addChannelListener(ChannelListener listener) {
- throw new UnsupportedOperationException("Listener Registration not supported");
+ default void addDeviceAgentListener(DeviceAgentListener listener) {
+ throw new UnsupportedOperationException(
+ "Device agent listener registration not supported");
}
/**
- * Removes a listener to a channel established with the device.
+ * Removes a device agent listener.
*
- * @param listener the channel listener
+ * @param listener device agent listener
*/
- default void removeChannelListener(ChannelListener listener) {
- throw new UnsupportedOperationException("Listener Removal not supported");
+ default void removeDeviceAgentListener(DeviceAgentListener listener) {
+ throw new UnsupportedOperationException(
+ "Device agent listener removal not supported");
}
}
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 414db4a..09d95b4 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -798,7 +798,8 @@
}
} else {
// we didn't get back what we asked for. Reelect someone else.
- log.warn("Failed to assert role [{}] onto Device {}", response, deviceId);
+ log.warn("Failed to assert role onto device {}. requested={}, response={}",
+ deviceId, requested, response);
if (requested == MastershipRole.MASTER) {
mastershipService.relinquishMastership(deviceId);
// TODO: Shouldn't we be triggering event?
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslatorImpl.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslatorImpl.java
index 5741e65..38efdb9 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslatorImpl.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslatorImpl.java
@@ -146,7 +146,7 @@
if (tableModel.supportsAging()) {
tableEntryBuilder.withTimeout((double) rule.timeout());
} else {
- log.warn("Flow rule is temporary, but table '{}' doesn't support " +
+ log.debug("Flow rule is temporary, but table '{}' doesn't support " +
"aging, translating to permanent.", tableModel.id());
}
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 212cf1c..55a4a11 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -139,7 +139,6 @@
@Override
public void register(PiPipeconf pipeconf) throws IllegalStateException {
- log.warn("Currently using local maps, needs to be moved to a distributed store");
if (piPipeconfs.containsKey(pipeconf.id())) {
throw new IllegalStateException(format("Pipeconf %s is already registered", pipeconf.id()));
}
@@ -262,7 +261,7 @@
cfgService.getConfig(deviceId, PiPipeconfConfig.class);
PiPipeconfId id = pipeconfConfig.piPipeconfId();
if (id.id().equals("")) {
- log.warn("Not adding empty pipeconfId for device {}", deviceId);
+ log.debug("Ignoring empty pipeconf ID for device {}", deviceId);
} else {
pipeconfMappingStore.createOrUpdateBinding(deviceId, id);
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
index 8e2b54d..ac4856a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
@@ -16,11 +16,13 @@
package org.onosproject.drivers.p4runtime;
+import io.grpc.StatusRuntimeException;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.net.pi.service.PiTranslationService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
@@ -28,6 +30,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -35,6 +42,10 @@
*/
public class AbstractP4RuntimeHandlerBehaviour extends AbstractHandlerBehaviour {
+ // Timeout in seconds for device operations.
+ // TODO make configurable via driver properties
+ private static final int DEVICE_OP_TIMEOUT = 5;
+
public static final String P4RUNTIME_SERVER_ADDR_KEY = "p4runtime_ip";
public static final String P4RUNTIME_SERVER_PORT_KEY = "p4runtime_port";
public static final String P4RUNTIME_DEVICE_ID_KEY = "p4runtime_deviceId";
@@ -76,12 +87,18 @@
client = controller.getClient(deviceId);
PiPipeconfService piPipeconfService = handler().get(PiPipeconfService.class);
- if (!piPipeconfService.ofDevice(deviceId).isPresent() ||
- !piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).isPresent()) {
- log.warn("Unable to get the pipeconf of {}, aborting operation", deviceId);
+ if (!piPipeconfService.ofDevice(deviceId).isPresent()) {
+ log.warn("Unable to get assigned pipeconf for {} (mapping " +
+ "missing in PiPipeconfService), aborting operation",
+ deviceId);
return false;
}
- pipeconf = piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).get();
+ PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).get();
+ if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
+ log.warn("Cannot find any pipeconf with ID '{}' ({}), aborting operation", pipeconfId, deviceId);
+ return false;
+ }
+ pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
piTranslationService = handler().get(PiTranslationService.class);
@@ -89,12 +106,12 @@
}
/**
- * Create a P4Runtime client for this device. Returns true if the operation
- * was successful, false otherwise.
+ * Returns a P4Runtime client for this device, null if such client cannot
+ * be created.
*
- * @return true if successful, false otherwise
+ * @return client or null
*/
- protected boolean createClient() {
+ P4RuntimeClient createClient() {
deviceId = handler().data().deviceId();
controller = handler().get(P4RuntimeController.class);
@@ -105,24 +122,38 @@
if (serverAddr == null || serverPortString == null || p4DeviceIdString == null) {
log.warn("Unable to create client for {}, missing driver data key (required is {}, {}, and {})",
deviceId, P4RUNTIME_SERVER_ADDR_KEY, P4RUNTIME_SERVER_PORT_KEY, P4RUNTIME_DEVICE_ID_KEY);
- return false;
+ return null;
}
- if (!controller.createClient(deviceId, serverAddr,
- Integer.parseUnsignedInt(serverPortString),
- Long.parseUnsignedLong(p4DeviceIdString))) {
+ final int serverPort;
+ final long p4DeviceId;
+
+ try {
+ serverPort = Integer.parseUnsignedInt(serverPortString);
+ } catch (NumberFormatException e) {
+ log.error("{} is not a valid P4Runtime port number", serverPortString);
+ return null;
+ }
+ try {
+ p4DeviceId = Long.parseUnsignedLong(p4DeviceIdString);
+ } catch (NumberFormatException e) {
+ log.error("{} is not a valid P4Runtime-internal device ID", p4DeviceIdString);
+ return null;
+ }
+
+ if (!controller.createClient(deviceId, serverAddr, serverPort, p4DeviceId)) {
log.warn("Unable to create client for {}, aborting operation", deviceId);
- return false;
+ return null;
}
- return true;
+ return controller.getClient(deviceId);
}
/**
- * Returns the value of the given driver property, if present,
- * otherwise returns the given default value.
+ * Returns the value of the given driver property, if present, otherwise
+ * returns the given default value.
*
- * @param propName property name
+ * @param propName property name
* @param defaultVal default value
* @return boolean
*/
@@ -134,4 +165,36 @@
return Boolean.parseBoolean(handler().driver().getProperty(propName));
}
}
+
+ /**
+ * Convenience method to get the result of a completable future while
+ * setting a timeout and checking for exceptions.
+ *
+ * @param future completable future
+ * @param opDescription operation description to use in log messages. Should
+ * be a sentence starting with a verb ending in -ing,
+ * e.g. "reading...", "writing...", etc.
+ * @param defaultValue value to return if operation fails
+ * @param <U> type of returned value
+ * @return future result or default value
+ */
+ <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
+ U defaultValue) {
+ try {
+ return future.get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.error("Exception while {} on {}", opDescription, deviceId);
+ } catch (ExecutionException e) {
+ final Throwable cause = e.getCause();
+ if (cause instanceof StatusRuntimeException) {
+ final StatusRuntimeException grpcError = (StatusRuntimeException) cause;
+ log.warn("Error while {} on {}: {}", opDescription, deviceId, grpcError.getMessage());
+ } else {
+ log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
+ }
+ } catch (TimeoutException e) {
+ log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
+ }
+ return defaultValue;
+ }
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
index 6a6c2e9..cfe717d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
@@ -18,9 +18,8 @@
import org.onlab.util.SharedExecutors;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.behaviour.PiPipelineProgrammable;
+import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.slf4j.Logger;
@@ -28,14 +27,14 @@
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Abstract implementation of the PiPipelineProgrammable behaviours for a P4Runtime device.
*/
-public abstract class AbstractP4RuntimePipelineProgrammable extends AbstractHandlerBehaviour
+public abstract class AbstractP4RuntimePipelineProgrammable
+ extends AbstractP4RuntimeHandlerBehaviour
implements PiPipelineProgrammable {
protected final Logger log = getLogger(getClass());
@@ -72,25 +71,31 @@
return false;
}
- try {
- if (!client.initStreamChannel().get()) {
- log.warn("Unable to init stream channel to {}.", deviceId);
- return false;
- }
- } catch (InterruptedException | ExecutionException e) {
- log.error("Exception while initializing stream channel on {}", deviceId, e);
+ // We need to be master to perform write RPC to the device.
+ // FIXME: properly perform mastership handling in the device provider
+ // This would probably mean deploying the pipeline after the device as
+ // been notified to the core.
+ final Boolean masterSuccess = getFutureWithDeadline(
+ client.becomeMaster(),
+ "becoming master ", null);
+ if (masterSuccess == null) {
+ // Error already logged by getFutureWithDeadline()
+ return false;
+ } else if (!masterSuccess) {
+ log.warn("Unable to become master for {}, aborting pipeconf deploy", deviceId);
return false;
}
- try {
- if (!client.setPipelineConfig(pipeconf, deviceDataBuffer).get()) {
- log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
- return false;
- }
- } catch (InterruptedException | ExecutionException e) {
- log.error("Exception while deploying pipeconf to {}", deviceId, e);
+ final Boolean deploySuccess = getFutureWithDeadline(
+ client.setPipelineConfig(pipeconf, deviceDataBuffer),
+ "deploying pipeconf", null);
+ if (deploySuccess == null) {
+ return false;
+ } else if (!deploySuccess) {
+ log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
return false;
}
+
return true;
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index 00fe9380..2b941cb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -21,7 +21,6 @@
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import io.grpc.StatusRuntimeException;
import org.onlab.util.SharedExecutors;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
@@ -32,7 +31,6 @@
import org.onosproject.net.pi.model.PiPipelineInterpreter;
import org.onosproject.net.pi.model.PiPipelineModel;
import org.onosproject.net.pi.model.PiTableId;
-import org.onosproject.net.pi.model.PiTableModel;
import org.onosproject.net.pi.runtime.PiCounterCellData;
import org.onosproject.net.pi.runtime.PiCounterCellId;
import org.onosproject.net.pi.runtime.PiTableEntry;
@@ -48,7 +46,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -110,7 +108,6 @@
}
});
private PiPipelineModel pipelineModel;
- private PiPipelineInterpreter interpreter;
private P4RuntimeTableMirror tableMirror;
private PiFlowRuleTranslator translator;
@@ -125,7 +122,6 @@
log.warn("Unable to get interpreter of {}", deviceId);
return false;
}
- interpreter = device.as(PiPipelineInterpreter.class);
pipelineModel = pipeconf.pipelineModel();
tableMirror = handler().get(P4RuntimeTableMirror.class);
translator = piTranslationService.flowRuleTranslator();
@@ -146,49 +142,35 @@
final ImmutableList.Builder<FlowEntry> result = ImmutableList.builder();
final List<PiTableEntry> inconsistentEntries = Lists.newArrayList();
- for (PiTableModel tableModel : pipelineModel.tables()) {
+ // Read table entries.
+ final Collection<PiTableEntry> installedEntries;
+ // TODO: ONOS-7596 read counters with table entries
+ installedEntries = getFutureWithDeadline(client.dumpAllTables(pipeconf),
+ "dumping tables", Collections.emptyList());
- final PiTableId piTableId = tableModel.id();
+ if (installedEntries.isEmpty()) {
+ return Collections.emptyList();
+ }
- // Read table entries.
- final Collection<PiTableEntry> installedEntries;
- try {
- // TODO: optimize by dumping entries and counters in parallel
- // From ALL tables with the same request.
- installedEntries = client.dumpTable(piTableId, pipeconf).get();
- } catch (InterruptedException | ExecutionException e) {
- if (!(e.getCause() instanceof StatusRuntimeException)) {
- // gRPC errors are logged in the client.
- log.error("Exception while dumping table {} of {}",
- piTableId, deviceId, e);
- }
- continue; // next table
- }
+ // Read table direct counters (if any).
+ final Map<PiTableEntry, PiCounterCellData> counterCellMap =
+ readEntryCounters(installedEntries);
- if (installedEntries.size() == 0) {
- continue; // next table
- }
+ // Forge flow entries with counter values.
+ for (PiTableEntry installedEntry : installedEntries) {
- // Read table direct counters (if any).
- final Map<PiTableEntry, PiCounterCellData> counterCellMap =
- readEntryCounters(installedEntries);
+ final FlowEntry flowEntry = forgeFlowEntry(
+ installedEntry, counterCellMap.get(installedEntry));
- // Forge flow entries with counter values.
- for (PiTableEntry installedEntry : installedEntries) {
-
- final FlowEntry flowEntry = forgeFlowEntry(
- installedEntry, counterCellMap.get(installedEntry));
-
- if (flowEntry == null) {
- // Entry is on device but unknown to translation service or
- // device mirror. Inconsistent. Mark for removal.
- // TODO: make this behaviour configurable
- // In some cases it's fine for the device to have rules
- // that were not installed by us.
- inconsistentEntries.add(installedEntry);
- } else {
- result.add(flowEntry);
- }
+ if (flowEntry == null) {
+ // Entry is on device but unknown to translation service or
+ // device mirror. Inconsistent. Mark for removal.
+ // TODO: make this behaviour configurable
+ // In some cases it's fine for the device to have rules
+ // that were not installed by us.
+ inconsistentEntries.add(installedEntry);
+ } else {
+ result.add(flowEntry);
}
}
@@ -354,19 +336,19 @@
*/
private boolean writeEntry(PiTableEntry entry,
WriteOperationType p4Operation) {
- try {
- if (client.writeTableEntries(
- newArrayList(entry), p4Operation, pipeconf).get()) {
- return true;
- } else {
- log.warn("Unable to {} table entry in {}: {}",
- p4Operation.name(), deviceId, entry);
- }
- } catch (InterruptedException | ExecutionException e) {
- log.warn("Exception while performing {} table entry operation:",
- p4Operation, e);
+ final CompletableFuture<Boolean> future = client.writeTableEntries(
+ newArrayList(entry), p4Operation, pipeconf);
+ final Boolean success = getFutureWithDeadline(
+ future, "performing table " + p4Operation.name(), null);
+ if (success == null) {
+ // Error logged by getFutureWithDeadline();
+ return false;
}
- return false;
+ if (!success) {
+ log.warn("Unable to {} table entry in {}: {}",
+ p4Operation.name(), deviceId, entry);
+ }
+ return success;
}
private void updateStores(PiTableEntryHandle handle,
@@ -392,36 +374,28 @@
private Map<PiTableEntry, PiCounterCellData> readEntryCounters(
Collection<PiTableEntry> tableEntries) {
if (!driverBoolProperty(SUPPORT_TABLE_COUNTERS,
- DEFAULT_SUPPORT_TABLE_COUNTERS)) {
+ DEFAULT_SUPPORT_TABLE_COUNTERS)
+ || tableEntries.isEmpty()) {
return Collections.emptyMap();
}
Collection<PiCounterCellData> cellDatas;
- try {
- if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
- DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
- // FIXME: re-implement reading all counters ONOS-7595, or
- // (even better) read counters when dumping table entries ONOS-7596
- // cellDatas = client.readAllCounterCells(
- // singleton(counterId), pipeconf).get();
- cellDatas = Collections.emptyList();
- } else {
- Set<PiCounterCellId> cellIds = tableEntries.stream()
- .filter(e -> tableHasCounter(e.table()))
- .map(PiCounterCellId::ofDirect)
- .collect(Collectors.toSet());
- cellDatas = client.readCounterCells(cellIds, pipeconf).get();
- }
- return cellDatas.stream()
- .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
- } catch (InterruptedException | ExecutionException e) {
- if (!(e.getCause() instanceof StatusRuntimeException)) {
- // gRPC errors are logged in the client.
- log.error("Exception while reading table counters from {}: {}",
- deviceId, e);
- }
- return Collections.emptyMap();
+
+ if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
+ DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
+ // FIXME: read counters when dumping table entries ONOS-7596
+ cellDatas = Collections.emptyList();
+ } else {
+ Set<PiCounterCellId> cellIds = tableEntries.stream()
+ .filter(e -> tableHasCounter(e.table()))
+ .map(PiCounterCellId::ofDirect)
+ .collect(Collectors.toSet());
+ cellDatas = getFutureWithDeadline(client.readCounterCells(cellIds, pipeconf),
+ "reading table counters", Collections.emptyList());
}
+ return cellDatas.stream()
+ .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
+
}
private boolean tableHasCounter(PiTableId tableId) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
index dc2ec69..02c0fa3 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
@@ -42,12 +42,12 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static java.lang.String.format;
import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.DELETE;
import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.INSERT;
import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.MODIFY;
@@ -226,7 +226,7 @@
if (!membersToRemove.isEmpty() &&
!completeFuture(client.writeActionGroupMembers(groupProfileId, membersToRemove, DELETE, pipeconf),
- ACT_GRP_MEMS_STR, DELETE_STR)) {
+ ACT_GRP_MEMS_STR, DELETE_STR)) {
// add what we removed
completeFuture(client.writeActionGroupMembers(groupProfileId, membersToRemove, INSERT, pipeconf),
ACT_GRP_MEMS_STR, DELETE_STR);
@@ -271,31 +271,18 @@
private boolean completeFuture(CompletableFuture<Boolean> completableFuture,
String topic, String action) {
- try {
- if (completableFuture.get()) {
- return true;
- } else {
- log.warn("Unable to {} {}", action, topic);
- return false;
- }
- } catch (InterruptedException | ExecutionException e) {
- log.warn("Exception while performing {} {}: {}", action, topic, e.getMessage());
- log.debug("Exception", e);
- return false;
- }
+ return getFutureWithDeadline(
+ completableFuture, format("performing %s %s", action, topic), false);
}
private Stream<Group> streamGroupsFromDevice(PiActionProfileId actProfId) {
- try {
- // Read PI groups and return original PD one.
- return client.dumpGroups(actProfId, pipeconf).get().stream()
- .map(this::forgeGroupEntry)
- .filter(Objects::nonNull);
- } catch (ExecutionException | InterruptedException e) {
- log.error("Exception while dumping groups from action profile '{}' on {}: {}",
- actProfId.id(), deviceId, e);
- return Stream.empty();
- }
+ // Read PI groups and return original PD one.
+ Collection<PiActionGroup> groups = getFutureWithDeadline(
+ client.dumpGroups(actProfId, pipeconf),
+ "dumping groups", Collections.emptyList());
+ return groups.stream()
+ .map(this::forgeGroupEntry)
+ .filter(Objects::nonNull);
}
private Group forgeGroupEntry(PiActionGroup piGroup) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index 58cab8a..23fcf1f 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -18,7 +18,7 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.device.DeviceHandshaker;
import org.onosproject.p4runtime.api.P4RuntimeController;
@@ -29,72 +29,60 @@
*/
public class P4RuntimeHandshaker extends AbstractP4RuntimeHandlerBehaviour implements DeviceHandshaker {
- // TODO: consider abstract class with empty connect method and implementation into a protected one for reusability.
-
@Override
public CompletableFuture<Boolean> connect() {
- return CompletableFuture.supplyAsync(this::doConnect);
- }
-
- private boolean doConnect() {
- return super.createClient();
+ return CompletableFuture
+ .supplyAsync(super::createClient)
+ .thenCompose(client -> {
+ if (client == null) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return client.start();
+ });
}
@Override
public CompletableFuture<Boolean> disconnect() {
- return CompletableFuture.supplyAsync(() -> {
- P4RuntimeController controller = handler().get(P4RuntimeController.class);
- DeviceId deviceId = handler().data().deviceId();
- controller.removeClient(deviceId);
- return true;
- });
+ final P4RuntimeController controller = handler().get(P4RuntimeController.class);
+ final DeviceId deviceId = handler().data().deviceId();
+ if (!controller.hasClient(deviceId)) {
+ return CompletableFuture.completedFuture(true);
+ } else {
+ return controller.getClient(deviceId).shutdown()
+ .thenApplyAsync(v -> {
+ controller.removeClient(deviceId);
+ return true;
+ });
+ }
}
@Override
public CompletableFuture<Boolean> isReachable() {
- return CompletableFuture.supplyAsync(() -> {
- P4RuntimeController controller = handler().get(P4RuntimeController.class);
- DeviceId deviceId = handler().data().deviceId();
- return controller.isReacheable(deviceId);
- });
+ return CompletableFuture.supplyAsync(() -> handler()
+ .get(P4RuntimeController.class)
+ .isReacheable(handler().data().deviceId())
+ );
}
@Override
- public CompletableFuture<MastershipRole> roleChanged(MastershipRole newRole) {
- deviceId = handler().data().deviceId();
- controller = handler().get(P4RuntimeController.class);
- CompletableFuture<MastershipRole> result = new CompletableFuture<>();
-
- client = controller.getClient(deviceId);
- if (client == null || !controller.isReacheable(deviceId)) {
- result.complete(MastershipRole.NONE);
- return result;
- }
- if (newRole.equals(MastershipRole.MASTER)) {
- client.sendMasterArbitrationUpdate().thenAcceptAsync(success -> {
- if (!success) {
- log.warn("Device {} arbitration failed", deviceId);
- result.complete(MastershipRole.STANDBY);
- } else {
- result.complete(MastershipRole.MASTER);
+ public void roleChanged(MastershipRole newRole) {
+ if (setupBehaviour() && newRole.equals(MastershipRole.MASTER)) {
+ client.becomeMaster().thenAcceptAsync(result -> {
+ if (!result) {
+ log.error("Unable to notify mastership role {} to {}",
+ newRole, deviceId);
}
});
- } else {
- // Since we don't need to do anything, we can complete it directly
- // Spec: The client with the highest election id is referred to as the
- // "master", while all other clients are referred to as "slaves".
- result.complete(newRole);
}
- return result;
}
@Override
- public void addChannelListener(ChannelListener listener) {
- controller.addChannelListener(deviceId, listener);
+ public void addDeviceAgentListener(DeviceAgentListener listener) {
+ controller.addDeviceAgentListener(deviceId, listener);
}
@Override
- public void removeChannelListener(ChannelListener listener) {
- controller.removeChannelListener(deviceId, listener);
+ public void removeDeviceAgentListener(DeviceAgentListener listener) {
+ controller.removeDeviceAgentListener(deviceId, listener);
}
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
index d7a204e..39ed84d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
@@ -16,9 +16,9 @@
package org.onosproject.drivers.p4runtime;
-import com.google.common.cache.LoadingCache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMeterMirror;
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.DefaultBand;
@@ -37,11 +37,11 @@
import java.util.Collection;
import java.util.Collections;
-import java.util.Set;
import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -108,18 +108,13 @@
final PiMeterHandle handle = PiMeterHandle.of(deviceId, piMeterCellConfig);
ENTRY_LOCKS.getUnchecked(handle).lock();
- boolean result = false;
- try {
- if (client.writeMeterCells(newArrayList(piMeterCellConfig), pipeconf).get()) {
- meterMirror.put(handle, piMeterCellConfig);
- result = true;
- }
-
- } catch (InterruptedException | ExecutionException e) {
- log.warn("Exception while modify meter entry:", e);
- } finally {
- ENTRY_LOCKS.getUnchecked(handle).unlock();
+ final boolean result = getFutureWithDeadline(
+ client.writeMeterCells(newArrayList(piMeterCellConfig), pipeconf),
+ "writing meter cells", false);
+ if (result) {
+ meterMirror.put(handle, piMeterCellConfig);
}
+ ENTRY_LOCKS.getUnchecked(handle).unlock();
return result;
}
@@ -162,4 +157,4 @@
return CompletableFuture.completedFuture(meters);
}
-}
\ No newline at end of file
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
index 6c61148..cc79cc1 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
@@ -26,7 +26,9 @@
/**
* Implementation of PacketProgrammable behaviour for P4Runtime.
*/
-public class P4RuntimePacketProgrammable extends AbstractP4RuntimeHandlerBehaviour implements PacketProgrammable {
+public class P4RuntimePacketProgrammable
+ extends AbstractP4RuntimeHandlerBehaviour
+ implements PacketProgrammable {
@Override
public void emit(OutboundPacket packet) {
@@ -38,7 +40,8 @@
final PiPipelineInterpreter interpreter = device.is(PiPipelineInterpreter.class)
? device.as(PiPipelineInterpreter.class) : null;
if (interpreter == null) {
- log.warn("Device {} with pipeconf {} has no interpreter, aborting emit operation", deviceId, pipeconf.id());
+ log.warn("Unable to get interpreter for {} with pipeconf {}, aborting emit operation",
+ deviceId, pipeconf.id());
return;
}
@@ -46,11 +49,13 @@
Collection<PiPacketOperation> operations = interpreter.mapOutboundPacket(packet);
operations.forEach(piPacketOperation -> {
log.debug("Doing PiPacketOperation {}", piPacketOperation);
- client.packetOut(piPacketOperation, pipeconf);
+ getFutureWithDeadline(
+ client.packetOut(piPacketOperation, pipeconf),
+ "sending packet-out", false);
});
} catch (PiPipelineInterpreter.PiInterpreterException e) {
- log.error("Interpreter of pipeconf {} was unable to translate outbound packet: {}",
- pipeconf.id(), e.getMessage());
+ log.error("Unable to translate outbound packet for {} with pipeconf {}: {}",
+ deviceId, pipeconf.id(), e.getMessage());
}
}
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
index 0bf61f2..a9a6e08 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
@@ -53,99 +53,135 @@
}
/**
- * Sets the device pipeline according to the given pipeconf, and for the given byte buffer representing the
- * target-specific data to be used in the P4Runtime's SetPipelineConfig message. This method should be called
+ * Starts the client by starting the Stream RPC with the device.
+ *
+ * @return completable future containing true if the operation was
+ * successful, false otherwise.
+ */
+ CompletableFuture<Boolean> start();
+
+ /**
+ * Shutdowns the client by terminating any active RPC such as the Stream
+ * one.
+ *
+ * @return a completable future to signal the completion of the shutdown
+ * procedure
+ */
+ CompletableFuture<Void> shutdown();
+
+ /**
+ * Sends a master arbitration update to the device with a new election ID
+ * that is guaranteed to be the highest value between all clients.
+ *
+ * @return completable future containing true if the operation was
+ * successful; false otherwise
+ */
+ CompletableFuture<Boolean> becomeMaster();
+
+ /**
+ * Sets the device pipeline according to the given pipeconf, and for the
+ * given byte buffer representing the target-specific data to be used in the
+ * P4Runtime's SetPipelineConfig message. This method should be called
* before any other method of this client.
*
* @param pipeconf pipeconf
* @param deviceData target-specific data
- * @return a completable future of a boolean, true if the operations was successful, false otherwise.
+ * @return a completable future of a boolean, true if the operations was
+ * successful, false otherwise.
*/
- CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData);
+ CompletableFuture<Boolean> setPipelineConfig(
+ PiPipeconf pipeconf, ByteBuffer deviceData);
/**
- * Initializes the stream channel, after which all messages received from the device will be notified using the
- * {@link P4RuntimeController} event listener.
- *
- * @return a completable future of a boolean, true if the operations was successful, false otherwise.
- */
- CompletableFuture<Boolean> initStreamChannel();
-
- /**
- * Performs the given write operation for the given table entries and pipeconf.
+ * Performs the given write operation for the given table entries and
+ * pipeconf.
*
* @param entries table entries
* @param opType operation type
* @param pipeconf pipeconf currently deployed on the device
* @return true if the operation was successful, false otherwise.
*/
- CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> entries, WriteOperationType opType,
- PiPipeconf pipeconf);
+ CompletableFuture<Boolean> writeTableEntries(
+ Collection<PiTableEntry> entries, WriteOperationType opType,
+ PiPipeconf pipeconf);
/**
- * Dumps all entries currently installed in the given table, for the given pipeconf.
+ * Dumps all entries currently installed in the given table, for the given
+ * pipeconf.
*
* @param tableId table identifier
* @param pipeconf pipeconf currently deployed on the device
* @return completable future of a collection of table entries
*/
- CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId tableId, PiPipeconf pipeconf);
+ CompletableFuture<Collection<PiTableEntry>> dumpTable(
+ PiTableId tableId, PiPipeconf pipeconf);
+
+ /**
+ * Dumps entries from all tables, for the given pipeconf.
+ *
+ * @param pipeconf pipeconf currently deployed on the device
+ * @return completable future of a collection of table entries
+ */
+ CompletableFuture<Collection<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf);
/**
* Executes a packet-out operation for the given pipeconf.
*
* @param packet packet-out operation to be performed by the device
* @param pipeconf pipeconf currently deployed on the device
- * @return a completable future of a boolean, true if the operations was successful, false otherwise.
+ * @return a completable future of a boolean, true if the operations was
+ * successful, false otherwise.
*/
- CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf);
+ CompletableFuture<Boolean> packetOut(
+ PiPacketOperation packet, PiPipeconf pipeconf);
/**
- * Returns the value of all counter cells for the given set of counter identifiers and pipeconf.
+ * Returns the value of all counter cells for the given set of counter
+ * identifiers and pipeconf.
*
* @param counterIds counter identifiers
* @param pipeconf pipeconf
* @return collection of counter data
*/
- CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
- PiPipeconf pipeconf);
+ CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(
+ Set<PiCounterId> counterIds, PiPipeconf pipeconf);
/**
- * Returns a collection of counter data corresponding to the given set of counter cell identifiers, for the given
- * pipeconf.
+ * Returns a collection of counter data corresponding to the given set of
+ * counter cell identifiers, for the given pipeconf.
*
* @param cellIds set of counter cell identifiers
* @param pipeconf pipeconf
* @return collection of counter data
*/
- CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
- PiPipeconf pipeconf);
+ CompletableFuture<Collection<PiCounterCellData>> readCounterCells(
+ Set<PiCounterCellId> cellIds, PiPipeconf pipeconf);
/**
- * Performs the given write operation for the given action group members and pipeconf.
+ * Performs the given write operation for the given action group members and
+ * pipeconf.
*
- * @param profileId action group profile ID
- * @param members action group members
- * @param opType write operation type
- * @param pipeconf the pipeconf currently deployed on the device
+ * @param profileId action group profile ID
+ * @param members action group members
+ * @param opType write operation type
+ * @param pipeconf the pipeconf currently deployed on the device
* @return true if the operation was successful, false otherwise
*/
- CompletableFuture<Boolean> writeActionGroupMembers(PiActionProfileId profileId,
- Collection<PiActionGroupMember> members,
- WriteOperationType opType,
- PiPipeconf pipeconf);
+ CompletableFuture<Boolean> writeActionGroupMembers(
+ PiActionProfileId profileId, Collection<PiActionGroupMember> members,
+ WriteOperationType opType, PiPipeconf pipeconf);
/**
- * Performs the given write operation for the given action group and pipeconf.
+ * Performs the given write operation for the given action group and
+ * pipeconf.
*
* @param group the action group
* @param opType write operation type
* @param pipeconf the pipeconf currently deployed on the device
* @return true if the operation was successful, false otherwise
*/
- CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
- WriteOperationType opType,
- PiPipeconf pipeconf);
+ CompletableFuture<Boolean> writeActionGroup(
+ PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf);
/**
* Dumps all groups currently installed for the given action profile.
@@ -154,50 +190,39 @@
* @param pipeconf the pipeconf currently deployed on the device
* @return completable future of a collection of groups
*/
- CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
- PiPipeconf pipeconf);
+ CompletableFuture<Collection<PiActionGroup>> dumpGroups(
+ PiActionProfileId actionProfileId, PiPipeconf pipeconf);
/**
- * Returns the configuration of all meter cells for the given set of meter identifiers and pipeconf.
+ * Returns the configuration of all meter cells for the given set of meter
+ * identifiers and pipeconf.
*
- * @param meterIds meter identifiers
- * @param pipeconf pipeconf
+ * @param meterIds meter identifiers
+ * @param pipeconf pipeconf
* @return collection of meter configurations
*/
- CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
- PiPipeconf pipeconf);
+ CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(
+ Set<PiMeterId> meterIds, PiPipeconf pipeconf);
/**
- * Returns a collection of meter configurations corresponding to the given set of meter cell identifiers,
- * for the given pipeconf.
+ * Returns a collection of meter configurations corresponding to the given
+ * set of meter cell identifiers, for the given pipeconf.
*
- * @param cellIds set of meter cell identifiers
- * @param pipeconf pipeconf
+ * @param cellIds set of meter cell identifiers
+ * @param pipeconf pipeconf
* @return collection of meter configrations
*/
- CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
- PiPipeconf pipeconf);
+ CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(
+ Set<PiMeterCellId> cellIds, PiPipeconf pipeconf);
/**
- * Performs a write operation for the given meter configurations and pipeconf.
+ * Performs a write operation for the given meter configurations and
+ * pipeconf.
*
- * @param cellConfigs meter cell configurations
- * @param pipeconf pipeconf currently deployed on the device
+ * @param cellConfigs meter cell configurations
+ * @param pipeconf pipeconf currently deployed on the device
* @return true if the operation was successful, false otherwise.
*/
- CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf);
-
- /**
- * Shutdown the client by terminating any active RPC such as the stream channel.
- */
- void shutdown();
-
- /**
- * Sends a master arbitration update to the device.
- *
- * @return a completable future containing true if the operation was successful; false otherwise
- */
- CompletableFuture<Boolean> sendMasterArbitrationUpdate();
-
- // TODO: work in progress.
+ CompletableFuture<Boolean> writeMeterCells(
+ Collection<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf);
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 3cd81f2..fc76a23 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -19,13 +19,14 @@
import com.google.common.annotations.Beta;
import org.onosproject.event.ListenerService;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentListener;
/**
* Controller of P4Runtime devices.
*/
@Beta
-public interface P4RuntimeController extends ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
+public interface P4RuntimeController
+ extends ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
/**
* Instantiates a new client to operate on a P4Runtime device identified by
@@ -58,20 +59,22 @@
*
* @param deviceId device identifier
* @return client instance
- * @throws IllegalStateException if no client exists for the given device identifier
+ * @throws IllegalStateException if no client exists for the given device
+ * identifier
*/
P4RuntimeClient getClient(DeviceId deviceId);
/**
- * Removes the client for the given device. If no client exists for the given device identifier, the
- * result is a no-op.
+ * Removes the client for the given device. If no client exists for the
+ * given device identifier, the result is a no-op.
*
* @param deviceId device identifier
*/
void removeClient(DeviceId deviceId);
/**
- * Returns true if a client exists for the given device identifier, false otherwise.
+ * Returns true if a client exists for the given device identifier, false
+ * otherwise.
*
* @param deviceId device identifier
* @return true if client exists, false otherwise.
@@ -79,37 +82,31 @@
boolean hasClient(DeviceId deviceId);
/**
- * Returns true if the P4Runtime server running on the given device is reachable, i.e. the channel is open and the
- * server is able to respond to RPCs, false otherwise. Reachability can be tested only if a client was previously
- * created using {@link #createClient(DeviceId, String, int, long)}, otherwise this method returns false.
+ * Returns true if the P4Runtime server running on the given device is
+ * reachable, i.e. the channel is open and the server is able to respond to
+ * RPCs, false otherwise. Reachability can be tested only if a client was
+ * previously created using {@link #createClient(DeviceId, String, int,
+ * long)}, otherwise this method returns false.
*
* @param deviceId device identifier.
- * @return true if a client was created and is able to contact the P4Runtime server, false otherwise.
+ * @return true if a client was created and is able to contact the P4Runtime
+ * server, false otherwise.
*/
boolean isReacheable(DeviceId deviceId);
/**
- * Gets new election id for device arbitration request.
- *
- * @return the election id
- */
- long getNewMasterElectionId();
-
- /**
- * Adds a listener for P4Runtime client-server channel events.
- * If the channel for the device is not present and/or established the listener will get notified
- * only after channel setup.
+ * Adds a listener for device agent events.
*
* @param deviceId device identifier
- * @param listener the channel listener
+ * @param listener the device agent listener
*/
- void addChannelListener(DeviceId deviceId, ChannelListener listener);
+ void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener);
/**
- * Removes the listener for P4Runtime client-server channel events.
+ * Removes the listener for device agent events.
*
* @param deviceId device identifier
- * @param listener the channel listener
+ * @param listener the device agent listener
*/
- void removeChannelListener(DeviceId deviceId, ChannelListener listener);
+ void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener);
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
index ae29097..0316ed4 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
@@ -37,7 +37,7 @@
/**
* Arbitration reply.
*/
- ARBITRATION,
+ ARBITRATION_RESPONSE,
/**
* Channel Event.
diff --git a/protocols/p4runtime/ctl/BUCK b/protocols/p4runtime/ctl/BUCK
index cc62acb..46540b3 100644
--- a/protocols/p4runtime/ctl/BUCK
+++ b/protocols/p4runtime/ctl/BUCK
@@ -3,6 +3,7 @@
COMPILE_DEPS = [
'//lib:CORE_DEPS',
+ '//lib:KRYO',
'//protocols/grpc/api:onos-protocols-grpc-api',
'//protocols/p4runtime/api:onos-protocols-p4runtime-api',
'//protocols/p4runtime/proto:onos-protocols-p4runtime-proto',
@@ -10,6 +11,7 @@
'//lib:grpc-stub-' + GRPC_VER,
'//lib:grpc-netty-' + GRPC_VER,
'//lib:protobuf-java-' + PROTOBUF_VER,
+ '//core/store/serializers:onos-core-serializers',
]
TEST_DEPS = [
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
index ba31e69..4dbcac3 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
@@ -31,7 +31,7 @@
/**
* Encoder/Decoder of action profile member.
*/
-public final class ActionProfileMemberEncoder {
+final class ActionProfileMemberEncoder {
private ActionProfileMemberEncoder() {
// Hide default constructor
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java
new file mode 100644
index 0000000..ea3c24f
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Default implementation of arbitration in P4Runtime.
+ */
+class ArbitrationResponse implements P4RuntimeEventSubject {
+
+ private DeviceId deviceId;
+ private boolean isMaster;
+
+ /**
+ * Creates arbitration with given role and master flag.
+ *
+ * @param deviceId the device
+ * @param isMaster true if arbitration response signals master status
+ */
+ ArbitrationResponse(DeviceId deviceId, boolean isMaster) {
+ this.deviceId = deviceId;
+ this.isMaster = isMaster;
+ }
+
+ /**
+ * Returns true if arbitration response signals master status, false
+ * otherwise.
+ *
+ * @return boolean flag
+ */
+ boolean isMaster() {
+ return isMaster;
+ }
+
+ @Override
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java
new file mode 100644
index 0000000..6e33514
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Channel event in P4Runtime.
+ */
+final class ChannelEvent implements P4RuntimeEventSubject {
+
+ enum Type {
+ OPEN,
+ CLOSED,
+ ERROR
+ }
+
+ private DeviceId deviceId;
+ private Type type;
+
+ /**
+ * Creates channel event with given status and throwable.
+ *
+ * @param deviceId the device
+ * @param type error type
+ */
+ ChannelEvent(DeviceId deviceId, Type type) {
+ this.deviceId = deviceId;
+ this.type = type;
+ }
+
+ /**
+ * Gets the type of this event.
+ *
+ * @return the error type
+ */
+ public Type type() {
+ return type;
+ }
+
+ @Override
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
deleted file mode 100644
index e8b8658..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-import p4.v1.P4RuntimeOuterClass.Uint128;
-
-/**
- * Default implementation of arbitration in P4Runtime.
- */
-public class DefaultArbitration implements P4RuntimeEventSubject {
- private MastershipRole role;
- private Uint128 electionId;
- private DeviceId deviceId;
-
- /**
- * Creates arbitration with given role and election id.
- *
- * @param deviceId the device
- * @param role the role
- * @param electionId the election id
- */
- DefaultArbitration(DeviceId deviceId, MastershipRole role, Uint128 electionId) {
- this.deviceId = deviceId;
- this.role = role;
- this.electionId = electionId;
- }
-
- /**
- * Gets the role of this arbitration.
- *
- * @return the role
- */
- public MastershipRole role() {
- return role;
- }
-
- /**
- * Gets election id of this arbitration.
- *
- * @return the election id
- */
- public Uint128 electionId() {
- return electionId;
- }
-
- @Override
- public DeviceId deviceId() {
- return deviceId;
- }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
deleted file mode 100644
index 5f5b3b9..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelEvent.Type;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-
-/**
- * Default implementation of channel event in P4Runtime. It allows passing any type of event.
- * If the event is an error a throwable can be directly passed.
- * Any other type of event cause can be passed as string.
- */
-public class DefaultChannelEvent implements P4RuntimeEventSubject {
- private DeviceId deviceId;
- private Type type;
- private Throwable throwable;
- private String message;
-
- /**
- * Creates channel event with given status and throwable.
- *
- * @param deviceId the device
- * @param type error type
- * @param throwable the cause
- */
- public DefaultChannelEvent(DeviceId deviceId, Type type, Throwable throwable) {
- this.deviceId = deviceId;
- this.type = type;
- this.message = throwable.getMessage();
- this.throwable = throwable;
- }
-
- /**
- * Creates channel event with given status and string cause.
- *
- * @param deviceId the device
- * @param type error type
- * @param message the message
- */
- public DefaultChannelEvent(DeviceId deviceId, Type type, String message) {
- this.deviceId = deviceId;
- this.type = type;
- this.message = message;
- this.throwable = null;
- }
-
- /**
- * Creates channel event with given status, cause and throwable.
- *
- * @param deviceId the device
- * @param type error type
- * @param message the message
- * @param throwable the cause
- */
- public DefaultChannelEvent(DeviceId deviceId, Type type, String message, Throwable throwable) {
- this.deviceId = deviceId;
- this.type = type;
- this.message = message;
- this.throwable = throwable;
- }
-
- /**
- * Gets the type of this event.
- *
- * @return the error type
- */
- public Type type() {
- return type;
- }
-
- /**
- * Gets the message related to this event.
- *
- * @return the message
- */
- public String message() {
- return message;
- }
-
-
- /**
- * Gets throwable of this event.
- * If no throwable is present returns null.
- *
- * @return the throwable
- */
- public Throwable throwable() {
- return throwable;
- }
-
- @Override
- public DeviceId deviceId() {
- return deviceId;
- }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java
new file mode 100644
index 0000000..ecfe35d
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounterMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Distributed implementation of a generator of P4Runtime election IDs.
+ */
+class DistributedElectionIdGenerator {
+
+ private final Logger log = getLogger(this.getClass());
+
+ private AtomicCounterMap<DeviceId> electionIds;
+
+ /**
+ * Creates a new election ID generator using the given storage service.
+ *
+ * @param storageService storage service
+ */
+ DistributedElectionIdGenerator(StorageService storageService) {
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .build();
+ this.electionIds = storageService.<DeviceId>atomicCounterMapBuilder()
+ .withName("p4runtime-election-ids")
+ .withSerializer(Serializer.using(serializer))
+ .build();
+ }
+
+ /**
+ * Returns an election ID for the given device ID. The first election ID for
+ * a given device ID is always 1.
+ *
+ * @param deviceId device ID
+ * @return new election ID
+ */
+ BigInteger generate(DeviceId deviceId) {
+ if (electionIds == null) {
+ return null;
+ }
+ // Default value is 0 for AtomicCounterMap.
+ return BigInteger.valueOf(electionIds.incrementAndGet(deviceId));
+ }
+
+ /**
+ * Destroy the backing distributed primitive of this generator.
+ */
+ void destroy() {
+ try {
+ electionIds.destroy().get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Exception while destroying distributed counter map", e);
+ } finally {
+ electionIds = null;
+ }
+ }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 43d01e5..c952e61 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -19,7 +19,6 @@
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -30,10 +29,9 @@
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.net.device.ChannelEvent;
import org.onosproject.net.pi.model.PiActionProfileId;
import org.onosproject.net.pi.model.PiCounterId;
import org.onosproject.net.pi.model.PiMeterId;
@@ -52,6 +50,8 @@
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.slf4j.Logger;
+import p4.config.v1.P4InfoOuterClass.P4Info;
+import p4.tmp.P4Config;
import p4.v1.P4RuntimeGrpc;
import p4.v1.P4RuntimeOuterClass;
import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
@@ -59,7 +59,6 @@
import p4.v1.P4RuntimeOuterClass.Entity;
import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
-import p4.v1.P4RuntimeOuterClass.PacketIn;
import p4.v1.P4RuntimeOuterClass.ReadRequest;
import p4.v1.P4RuntimeOuterClass.ReadResponse;
import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
@@ -69,9 +68,8 @@
import p4.v1.P4RuntimeOuterClass.Uint128;
import p4.v1.P4RuntimeOuterClass.Update;
import p4.v1.P4RuntimeOuterClass.WriteRequest;
-import p4.config.v1.P4InfoOuterClass.P4Info;
-import p4.tmp.P4Config;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
@@ -81,7 +79,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -99,13 +96,17 @@
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
+import static p4.v1.P4RuntimeOuterClass.PacketIn;
import static p4.v1.P4RuntimeOuterClass.PacketOut;
import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
/**
* Implementation of a P4Runtime client.
*/
-public final class P4RuntimeClientImpl implements P4RuntimeClient {
+final class P4RuntimeClientImpl implements P4RuntimeClient {
+
+ // Timeout in seconds to obtain the client lock.
+ private static final int LOCK_TIMEOUT = 10;
private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
@@ -116,18 +117,20 @@
private final Logger log = getLogger(getClass());
+ private final Lock requestLock = new ReentrantLock();
+ private final Context.CancellableContext cancellableContext =
+ Context.current().withCancellation();
+
private final DeviceId deviceId;
private final long p4DeviceId;
private final P4RuntimeControllerImpl controller;
private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
- private final Context.CancellableContext cancellableContext;
private final ExecutorService executorService;
private final Executor contextExecutor;
- private final Lock writeLock = new ReentrantLock();
private final StreamObserver<StreamMessageRequest> streamRequestObserver;
- private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
- protected Uint128 p4RuntimeElectionId;
+ // Used by this client for write requests.
+ private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
/**
* Default constructor.
@@ -142,45 +145,77 @@
this.deviceId = deviceId;
this.p4DeviceId = p4DeviceId;
this.controller = controller;
- this.cancellableContext = Context.current().withCancellation();
this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
- "onos/p4runtime-client-" + deviceId.toString(),
- deviceId.toString() + "-%d"));
+ "onos-p4runtime-client-" + deviceId.toString(), "%d"));
this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
- //TODO Investigate deadline or timeout in supplyInContext Method
+ //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
- P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
- this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
+ this.streamRequestObserver = P4RuntimeGrpc.newStub(channel)
+ .streamChannel(new StreamChannelResponseObserver());
}
/**
- * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
- * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
- * <p>
- * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
- * <p>
+ * Submits a task for async execution via the given executor.
+ * All tasks submitted with this method will be executed sequentially.
*/
- private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
+ private <U> CompletableFuture<U> supplyWithExecutor(
+ Supplier<U> supplier, String opDescription, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
// TODO: explore a more relaxed locking strategy.
- writeLock.lock();
+ try {
+ if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
+ log.error("LOCK TIMEOUT! This is likely a deadlock, "
+ + "please debug (executing {})",
+ opDescription);
+ throw new IllegalThreadStateException("Lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.warn("Thread interrupted while waiting for lock (executing {})",
+ opDescription);
+ throw new RuntimeException(e);
+ }
try {
return supplier.get();
} catch (StatusRuntimeException ex) {
- log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
+ log.warn("Unable to execute {} on {}: {}",
+ opDescription, deviceId, ex.toString());
throw ex;
} catch (Throwable ex) {
- log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
+ log.error("Exception in client of {}, executing {}",
+ deviceId, opDescription, ex);
throw ex;
} finally {
- writeLock.unlock();
+ requestLock.unlock();
}
- }, contextExecutor);
+ }, executor);
+ }
+
+ /**
+ * Equivalent of supplyWithExecutor using the gRPC context executor of this
+ * client, such that if the context is cancelled (e.g. client shutdown) the
+ * RPC is automatically cancelled.
+ */
+ private <U> CompletableFuture<U> supplyInContext(
+ Supplier<U> supplier, String opDescription) {
+ return supplyWithExecutor(supplier, opDescription, contextExecutor);
}
@Override
- public CompletableFuture<Boolean> initStreamChannel() {
- return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
+ public CompletableFuture<Boolean> start() {
+ return supplyInContext(this::doInitStreamChannel,
+ "start-initStreamChannel");
+ }
+
+ @Override
+ public CompletableFuture<Void> shutdown() {
+ return supplyWithExecutor(this::doShutdown, "shutdown",
+ SharedExecutors.getPoolThreadExecutor());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> becomeMaster() {
+ return supplyInContext(this::doBecomeMaster,
+ "becomeMaster");
}
@Override
@@ -201,6 +236,11 @@
}
@Override
+ public CompletableFuture<Collection<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
+ return supplyInContext(() -> doDumpTable(null, pipeconf), "dumpAllTables");
+ }
+
+ @Override
public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
}
@@ -245,11 +285,6 @@
}
@Override
- public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
- return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
- }
-
- @Override
public CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
@@ -272,42 +307,48 @@
/* Blocking method implementations below */
- private boolean doArbitrationUpdate() {
+ private boolean doBecomeMaster() {
+ final Uint128 newId = bigIntegerToUint128(
+ controller.newMasterElectionId(deviceId));
+ if (sendMasterArbitrationUpdate(newId)) {
+ clientElectionId = newId;
+ return true;
+ }
+ return false;
+ }
- CompletableFuture<Boolean> result = new CompletableFuture<>();
- // TODO: currently we use 64-bit Long type for election id, should
- // we use 128-bit ?
- long nextElectId = controller.getNewMasterElectionId();
- Uint128 newElectionId = Uint128.newBuilder()
- .setLow(nextElectId)
- .build();
- MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
- .setDeviceId(p4DeviceId)
- .setElectionId(newElectionId)
- .build();
- StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
- .setArbitration(arbitrationUpdate)
- .build();
- log.debug("Sending arbitration update to {} with election id {}...",
- deviceId, newElectionId);
- arbitrationUpdateMap.put(newElectionId, result);
+ private boolean sendMasterArbitrationUpdate(Uint128 electionId) {
+ log.info("Sending arbitration update to {}... electionId={}",
+ deviceId, uint128ToBigInteger(electionId));
try {
- streamRequestObserver.onNext(requestMsg);
- return result.get();
+ streamRequestObserver.onNext(
+ StreamMessageRequest.newBuilder()
+ .setArbitration(
+ MasterArbitrationUpdate
+ .newBuilder()
+ .setDeviceId(p4DeviceId)
+ .setElectionId(electionId)
+ .build())
+ .build());
+ return true;
} catch (StatusRuntimeException e) {
log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
- arbitrationUpdateMap.remove(newElectionId);
- return false;
- } catch (InterruptedException | ExecutionException e) {
- log.warn("Arbitration update failed for {} due to {}", deviceId, e);
- arbitrationUpdateMap.remove(newElectionId);
- return false;
}
+ return false;
}
+
private boolean doInitStreamChannel() {
// To listen for packets and other events, we need to start the RPC.
- // Here we do it by sending a master arbitration update.
- return doArbitrationUpdate();
+ // Here we send an empty StreamMessageRequest.
+ try {
+ log.info("Starting stream channel with {}...", deviceId);
+ streamRequestObserver.onNext(StreamMessageRequest.newBuilder().build());
+ return true;
+ } catch (StatusRuntimeException e) {
+ log.error("Unable to start stream channel with {}: {}",
+ deviceId, e.getMessage());
+ return false;
+ }
}
private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
@@ -338,7 +379,7 @@
SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
.newBuilder()
.setDeviceId(p4DeviceId)
- .setElectionId(p4RuntimeElectionId)
+ .setElectionId(clientElectionId)
.setAction(VERIFY_AND_COMMIT)
.setConfig(pipelineConfig)
.build();
@@ -380,7 +421,7 @@
writeRequestBuilder
.setDeviceId(p4DeviceId)
- .setElectionId(p4RuntimeElectionId)
+ .setElectionId(clientElectionId)
.addAllUpdates(updateMsgs)
.build();
@@ -388,7 +429,7 @@
blockingStub.write(writeRequestBuilder.build());
return true;
} catch (StatusRuntimeException e) {
- logWriteErrors(piTableEntries, e, opType, "table entry");
+ checkAndLogWriteErrors(piTableEntries, e, opType, "table entry");
return false;
}
}
@@ -397,13 +438,18 @@
log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
- P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
int tableId;
- try {
- tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
- } catch (P4InfoBrowser.NotFoundException e) {
- log.warn("Unable to dump table: {}", e.getMessage());
- return Collections.emptyList();
+ if (piTableId == null) {
+ // Dump all tables.
+ tableId = 0;
+ } else {
+ P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+ try {
+ tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
+ } catch (P4InfoBrowser.NotFoundException e) {
+ log.warn("Unable to dump table: {}", e.getMessage());
+ return Collections.emptyList();
+ }
}
ReadRequest requestMsg = ReadRequest.newBuilder()
@@ -474,40 +520,29 @@
}
// Decode packet message and post event.
PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
- DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
+ PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
log.debug("Received packet in: {}", event);
controller.postEvent(event);
}
- private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
- log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
-
- Uint128 electionId = arbitrationMsg.getElectionId();
- CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
-
- if (mastershipFeature == null) {
- log.warn("Can't find completable future of election id {}", electionId);
+ private void doArbitrationResponse(MasterArbitrationUpdate msg) {
+ // From the spec...
+ // - Election_id: The stream RPC with the highest election_id is the
+ // master. Switch populates with the highest election ID it
+ // has received from all connected controllers.
+ // - Status: Switch populates this with OK for the client that is the
+ // master, and with an error status for all other connected clients (at
+ // every mastership change).
+ if (!msg.hasElectionId() || !msg.hasStatus()) {
return;
}
-
- this.p4RuntimeElectionId = electionId;
- int statusCode = arbitrationMsg.getStatus().getCode();
- MastershipRole arbitrationRole;
- // arbitration update success
-
- if (statusCode == Status.OK.getCode().value()) {
- mastershipFeature.complete(true);
- arbitrationRole = MastershipRole.MASTER;
- } else {
- mastershipFeature.complete(false);
- arbitrationRole = MastershipRole.STANDBY;
- }
-
- DefaultArbitration arbitrationEventSubject = new DefaultArbitration(deviceId, arbitrationRole, electionId);
- P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
- arbitrationEventSubject);
- controller.postEvent(event);
+ final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
+ log.info("Received arbitration update from {}: isMaster={}, electionId={}",
+ deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
+ controller.postEvent(new P4RuntimeEvent(
+ P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
+ new ArbitrationResponse(deviceId, isMaster)));
}
private Collection<PiCounterCellData> doReadAllCounterCells(
@@ -583,14 +618,14 @@
WriteRequest writeRequestMsg = WriteRequest.newBuilder()
.setDeviceId(p4DeviceId)
- .setElectionId(p4RuntimeElectionId)
+ .setElectionId(clientElectionId)
.addAllUpdates(updateMsgs)
.build();
try {
blockingStub.write(writeRequestMsg);
return true;
} catch (StatusRuntimeException e) {
- logWriteErrors(members, e, opType, "group member");
+ checkAndLogWriteErrors(members, e, opType, "group member");
return false;
}
}
@@ -729,7 +764,7 @@
final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
.setDeviceId(p4DeviceId)
- .setElectionId(p4RuntimeElectionId)
+ .setElectionId(clientElectionId)
.addUpdates(Update.newBuilder()
.setEntity(Entity.newBuilder()
.setActionProfileGroup(actionProfileGroup)
@@ -741,7 +776,7 @@
blockingStub.write(writeRequestMsg);
return true;
} catch (StatusRuntimeException e) {
- logWriteErrors(Collections.singleton(group), e, opType, "group");
+ checkAndLogWriteErrors(Collections.singleton(group), e, opType, "group");
return false;
}
}
@@ -814,7 +849,7 @@
writeRequestBuilder
.setDeviceId(p4DeviceId)
- .setElectionId(p4RuntimeElectionId)
+ .setElectionId(clientElectionId)
.addAllUpdates(updateMsgs)
.build();
try {
@@ -827,53 +862,34 @@
}
}
- /**
- * Returns the internal P4 device ID associated with this client.
- *
- * @return P4 device ID
- */
- public long p4DeviceId() {
- return p4DeviceId;
- }
-
- /**
- * For testing purpose only. TODO: remove before release.
- *
- * @return blocking stub
- */
- public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
- return this.blockingStub;
- }
-
-
- @Override
- public void shutdown() {
-
+ private Void doShutdown() {
log.info("Shutting down client for {}...", deviceId);
-
- writeLock.lock();
- try {
- if (streamRequestObserver != null) {
- streamRequestObserver.onCompleted();
- cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
- }
-
- this.executorService.shutdown();
+ if (streamRequestObserver != null) {
try {
- executorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Executor service didn't shutdown in time.");
- Thread.currentThread().interrupt();
+ streamRequestObserver.onCompleted();
+ } catch (IllegalStateException e) {
+ // Thrown if stream channel is already completed. Can ignore.
+ log.debug("Ignored expection: {}", e);
}
- } finally {
- writeLock.unlock();
+ cancellableContext.cancel(new InterruptedException(
+ "Requested client shutdown"));
}
+ this.executorService.shutdown();
+ try {
+ executorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Executor service didn't shutdown in time.");
+ Thread.currentThread().interrupt();
+ }
+ return null;
}
- private <E extends PiEntity> void logWriteErrors(Collection<E> writeEntities,
- StatusRuntimeException ex,
- WriteOperationType opType,
- String entryType) {
+ private <E extends PiEntity> void checkAndLogWriteErrors(
+ Collection<E> writeEntities, StatusRuntimeException ex,
+ WriteOperationType opType, String entryType) {
+
+ checkGrpcException(ex);
+
List<P4RuntimeOuterClass.Error> errors = null;
String description = null;
try {
@@ -946,10 +962,80 @@
err.hasDetails() ? "\n" + err.getDetails().toString() : "");
}
+ private void checkGrpcException(StatusRuntimeException ex) {
+ switch (ex.getStatus().getCode()) {
+ case OK:
+ break;
+ case CANCELLED:
+ break;
+ case UNKNOWN:
+ break;
+ case INVALID_ARGUMENT:
+ break;
+ case DEADLINE_EXCEEDED:
+ break;
+ case NOT_FOUND:
+ break;
+ case ALREADY_EXISTS:
+ break;
+ case PERMISSION_DENIED:
+ // Notify upper layers that this node is not master.
+ controller.postEvent(new P4RuntimeEvent(
+ P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
+ new ArbitrationResponse(deviceId, false)));
+ break;
+ case RESOURCE_EXHAUSTED:
+ break;
+ case FAILED_PRECONDITION:
+ break;
+ case ABORTED:
+ break;
+ case OUT_OF_RANGE:
+ break;
+ case UNIMPLEMENTED:
+ break;
+ case INTERNAL:
+ break;
+ case UNAVAILABLE:
+ // Channel might be closed.
+ controller.postEvent(new P4RuntimeEvent(
+ P4RuntimeEvent.Type.CHANNEL_EVENT,
+ new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
+ break;
+ case DATA_LOSS:
+ break;
+ case UNAUTHENTICATED:
+ break;
+ default:
+ break;
+ }
+ }
+
+ private Uint128 bigIntegerToUint128(BigInteger value) {
+ final byte[] arr = value.toByteArray();
+ final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
+ .put(new byte[Long.BYTES * 2 - arr.length])
+ .put(arr);
+ bb.rewind();
+ return Uint128.newBuilder()
+ .setHigh(bb.getLong())
+ .setLow(bb.getLong())
+ .build();
+ }
+
+ private BigInteger uint128ToBigInteger(Uint128 value) {
+ return new BigInteger(
+ ByteBuffer.allocate(Long.BYTES * 2)
+ .putLong(value.getHigh())
+ .putLong(value.getLow())
+ .array());
+ }
+
/**
* Handles messages received from the device on the stream channel.
*/
- private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
+ private class StreamChannelResponseObserver
+ implements StreamObserver<StreamMessageResponse> {
@Override
public void onNext(StreamMessageResponse message) {
@@ -958,40 +1044,40 @@
private void doNext(StreamMessageResponse message) {
try {
- log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
+ log.debug("Received message on stream channel from {}: {}",
+ deviceId, message.getUpdateCase());
switch (message.getUpdateCase()) {
case PACKET:
- // Packet-in
doPacketIn(message.getPacket());
return;
case ARBITRATION:
- doArbitrationUpdateFromDevice(message.getArbitration());
+ doArbitrationResponse(message.getArbitration());
return;
default:
- log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+ log.warn("Unrecognized stream message from {}: {}",
+ deviceId, message.getUpdateCase());
}
} catch (Throwable ex) {
- log.error("Exception while processing stream channel message from {}", deviceId, ex);
+ log.error("Exception while processing stream message from {}",
+ deviceId, ex);
}
}
@Override
public void onError(Throwable throwable) {
- log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
- controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
- new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_ERROR,
- throwable)));
- // FIXME: we might want to recreate the channel.
- // In general, we want to be robust against any transient error and, if the channel is open, make sure the
- // stream channel is always on.
+ log.warn("Error on stream channel for {}: {}",
+ deviceId, Status.fromThrowable(throwable));
+ controller.postEvent(new P4RuntimeEvent(
+ P4RuntimeEvent.Type.CHANNEL_EVENT,
+ new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
}
@Override
public void onCompleted() {
log.warn("Stream channel for {} has completed", deviceId);
- controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
- new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_DISCONNECTED,
- "Stream channel has completed")));
+ controller.postEvent(new P4RuntimeEvent(
+ P4RuntimeEvent.Type.CHANNEL_EVENT,
+ new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
}
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 987356b..d2773b2 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -35,20 +35,21 @@
import org.onosproject.grpc.api.GrpcChannelId;
import org.onosproject.grpc.api.GrpcController;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelEvent;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.api.P4RuntimeEventListener;
-import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
+import java.math.BigInteger;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -65,14 +66,13 @@
extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
implements P4RuntimeController {
- private static final String P4R_ELECTION = "p4runtime-election";
private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
private final Logger log = getLogger(getClass());
private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
private final Map<DeviceId, ClientKey> deviceIdToClientKey = Maps.newHashMap();
private final Map<ClientKey, P4RuntimeClient> clientKeyToClient = Maps.newHashMap();
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
- private final Map<DeviceId, List<ChannelListener>> channelListeners = Maps.newConcurrentMap();
+ private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
.expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
.build(new CacheLoader<DeviceId, ReadWriteLock>() {
@@ -81,8 +81,7 @@
return new ReentrantReadWriteLock();
}
});
-
- private AtomicCounter electionIdGenerator;
+ private DistributedElectionIdGenerator electionIdGenerator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private GrpcController grpcController;
@@ -93,8 +92,7 @@
@Activate
public void activate() {
eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
- electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION);
-
+ electionIdGenerator = new DistributedElectionIdGenerator(storageService);
log.info("Started");
}
@@ -102,6 +100,8 @@
@Deactivate
public void deactivate() {
grpcController = null;
+ electionIdGenerator.destroy();
+ electionIdGenerator = null;
eventDispatcher.removeSink(P4RuntimeEvent.class);
log.info("Stopped");
}
@@ -119,13 +119,13 @@
.usePlaintext(true);
deviceLocks.getUnchecked(deviceId).writeLock().lock();
- log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
- deviceId, serverAddr, serverPort, p4DeviceId);
try {
if (deviceIdToClientKey.containsKey(deviceId)) {
final ClientKey existingKey = deviceIdToClientKey.get(deviceId);
if (newKey.equals(existingKey)) {
+ log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+ deviceId, serverAddr, serverPort, p4DeviceId);
return true;
} else {
throw new IllegalStateException(
@@ -133,6 +133,8 @@
"server endpoints already exists");
}
} else {
+ log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
+ deviceId, serverAddr, serverPort, p4DeviceId);
return doCreateClient(newKey, channelBuilder);
}
} finally {
@@ -187,12 +189,11 @@
public void removeClient(DeviceId deviceId) {
deviceLocks.getUnchecked(deviceId).writeLock().lock();
-
try {
if (deviceIdToClientKey.containsKey(deviceId)) {
final ClientKey clientKey = deviceIdToClientKey.get(deviceId);
- grpcController.disconnectChannel(channelIds.get(deviceId));
clientKeyToClient.remove(clientKey).shutdown();
+ grpcController.disconnectChannel(channelIds.get(deviceId));
deviceIdToClientKey.remove(deviceId);
channelIds.remove(deviceId);
}
@@ -222,7 +223,7 @@
log.debug("No client for {}, can't check for reachability", deviceId);
return false;
}
-
+ // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
return grpcController.isChannelOpen(channelIds.get(deviceId));
} finally {
deviceLocks.getUnchecked(deviceId).readLock().unlock();
@@ -230,67 +231,73 @@
}
@Override
- public long getNewMasterElectionId() {
- return electionIdGenerator.incrementAndGet();
+ public void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+ deviceAgentListeners.putIfAbsent(deviceId, new CopyOnWriteArrayList<>());
+ deviceAgentListeners.get(deviceId).add(listener);
}
@Override
- public void addChannelListener(DeviceId deviceId, ChannelListener listener) {
- channelListeners.compute(deviceId, (devId, listeners) -> {
- List<ChannelListener> newListeners;
- if (listeners != null) {
- newListeners = listeners;
- } else {
- newListeners = new ArrayList<>();
- }
- newListeners.add(listener);
- return newListeners;
+ public void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+ deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
+ listeners.remove(listener);
+ return listeners;
});
}
- @Override
- public void removeChannelListener(DeviceId deviceId, ChannelListener listener) {
- channelListeners.compute(deviceId, (devId, listeners) -> {
- if (listeners != null) {
- listeners.remove(listener);
- return listeners;
- } else {
- log.debug("Device {} has no listener registered", deviceId);
- return null;
- }
- });
+ BigInteger newMasterElectionId(DeviceId deviceId) {
+ return electionIdGenerator.generate(deviceId);
}
void postEvent(P4RuntimeEvent event) {
- if (event.type().equals(P4RuntimeEvent.Type.CHANNEL_EVENT)) {
- DefaultChannelEvent channelError = (DefaultChannelEvent) event.subject();
- DeviceId deviceId = event.subject().deviceId();
- ChannelEvent channelEvent = null;
- //If disconnection is already known we propagate it.
- if (channelError.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
- channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
- channelError.throwable());
- } else if (channelError.type().equals(ChannelEvent.Type.CHANNEL_ERROR)) {
- //If we don't know what the error is we check for reachability
- if (!isReacheable(deviceId)) {
- //if false the channel has disconnected
- channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
- channelError.throwable());
- } else {
- // else we propagate the event.
- channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_ERROR, channelError.deviceId(),
- channelError.throwable());
- }
- }
- //Ignoring CHANNEL_CONNECTED
- if (channelEvent != null && channelListeners.get(deviceId) != null) {
- for (ChannelListener listener : channelListeners.get(deviceId)) {
- listener.event(channelEvent);
- }
- }
- } else {
- post(event);
+ switch (event.type()) {
+ case CHANNEL_EVENT:
+ handleChannelEvent(event);
+ break;
+ case ARBITRATION_RESPONSE:
+ handleArbitrationReply(event);
+ break;
+ default:
+ post(event);
+ break;
}
}
+ private void handleChannelEvent(P4RuntimeEvent event) {
+ final ChannelEvent channelEvent = (ChannelEvent) event.subject();
+ final DeviceId deviceId = channelEvent.deviceId();
+ final DeviceAgentEvent.Type agentEventType;
+ switch (channelEvent.type()) {
+ case OPEN:
+ agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+ break;
+ case CLOSED:
+ agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+ break;
+ case ERROR:
+ agentEventType = !isReacheable(deviceId)
+ ? DeviceAgentEvent.Type.CHANNEL_CLOSED
+ : DeviceAgentEvent.Type.CHANNEL_ERROR;
+ break;
+ default:
+ log.warn("Unrecognized channel event type {}", channelEvent.type());
+ return;
+ }
+ postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
+ }
+
+ private void handleArbitrationReply(P4RuntimeEvent event) {
+ final DeviceId deviceId = event.subject().deviceId();
+ final ArbitrationResponse response = (ArbitrationResponse) event.subject();
+ final DeviceAgentEvent.Type roleType = response.isMaster()
+ ? DeviceAgentEvent.Type.ROLE_MASTER
+ : DeviceAgentEvent.Type.ROLE_STANDBY;
+ postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
+ roleType, response.deviceId()));
+ }
+
+ private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
+ if (deviceAgentListeners.containsKey(deviceId)) {
+ deviceAgentListeners.get(deviceId).forEach(l -> l.event(event));
+ }
+ }
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
similarity index 89%
rename from protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java
rename to protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
index a1cce46..dddee2b 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
@@ -25,14 +25,14 @@
import static com.google.common.base.Preconditions.checkNotNull;
/**
- * Default implementation of a packet-in in P4Runtime.
+ * P4Runtime packet-in.
*/
-final class DefaultPacketIn implements P4RuntimePacketIn {
+final class PacketInEvent implements P4RuntimePacketIn {
private final DeviceId deviceId;
private final PiPacketOperation operation;
- DefaultPacketIn(DeviceId deviceId, PiPacketOperation operation) {
+ PacketInEvent(DeviceId deviceId, PiPacketOperation operation) {
this.deviceId = checkNotNull(deviceId);
this.operation = checkNotNull(operation);
}
@@ -55,7 +55,7 @@
if (o == null || getClass() != o.getClass()) {
return false;
}
- DefaultPacketIn that = (DefaultPacketIn) o;
+ PacketInEvent that = (PacketInEvent) o;
return Objects.equal(deviceId, that.deviceId) &&
Objects.equal(operation, that.operation);
}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index 1e83577..893049f 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -144,7 +144,7 @@
}
@AfterClass
- public static void globalTeerDown() {
+ public static void globalTearDown() {
grpcServer.shutdown();
grpcChannel.shutdown();
}
@@ -156,7 +156,7 @@
client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID,
grpcChannel,
controller);
- client.p4RuntimeElectionId = DEFAULT_ELECTION_ID;
+ client.becomeMaster();
}
@Test
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
similarity index 88%
rename from protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java
rename to protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
index 5993142..0a1f82e 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
@@ -32,7 +32,7 @@
/**
* Test for DefaultPacketIn class.
*/
-public class DefaultPacketInTest {
+public class PacketInEventTest {
private static final int DEFAULT_ORIGINAL_VALUE = 255;
private static final int DEFAULT_BIT_WIDTH = 9;
@@ -46,10 +46,10 @@
private PiPacketOperation packetOperation2;
private PiPacketOperation nullPacketOperation = null;
- private DefaultPacketIn packetIn;
- private DefaultPacketIn sameAsPacketIn;
- private DefaultPacketIn packetIn2;
- private DefaultPacketIn packetIn3;
+ private PacketInEvent packetIn;
+ private PacketInEvent sameAsPacketIn;
+ private PacketInEvent packetIn2;
+ private PacketInEvent packetIn3;
/**
* Setup method for packetOperation and packetOperation2.
@@ -78,10 +78,10 @@
.build())
.build();
- packetIn = new DefaultPacketIn(deviceId, packetOperation);
- sameAsPacketIn = new DefaultPacketIn(sameDeviceId, packetOperation);
- packetIn2 = new DefaultPacketIn(deviceId2, packetOperation);
- packetIn3 = new DefaultPacketIn(deviceId, packetOperation2);
+ packetIn = new PacketInEvent(deviceId, packetOperation);
+ sameAsPacketIn = new PacketInEvent(sameDeviceId, packetOperation);
+ packetIn2 = new PacketInEvent(deviceId2, packetOperation);
+ packetIn3 = new PacketInEvent(deviceId, packetOperation2);
}
/**
@@ -105,7 +105,7 @@
@Test(expected = NullPointerException.class)
public void testConstructorWithNullDeviceId() {
- new DefaultPacketIn(nullDeviceId, packetOperation);
+ new PacketInEvent(nullDeviceId, packetOperation);
}
/**
@@ -114,7 +114,7 @@
@Test(expected = NullPointerException.class)
public void testConstructorWithNullPacketOperation() {
- new DefaultPacketIn(deviceId, nullPacketOperation);
+ new PacketInEvent(deviceId, nullPacketOperation);
}
/**
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 550ca59..2b31b91 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -50,9 +50,9 @@
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.BasicDeviceConfig;
import org.onosproject.net.config.basics.SubjectFactories;
-import org.onosproject.net.device.ChannelEvent;
-import org.onosproject.net.device.ChannelListener;
import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
import org.onosproject.net.device.DeviceEvent;
@@ -95,6 +95,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -102,6 +103,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.device.DeviceEvent.Type;
@@ -116,45 +118,48 @@
@Component(immediate = true)
public class GeneralDeviceProvider extends AbstractProvider
implements DeviceProvider {
- public static final String DRIVER = "driver";
- public static final int REACHABILITY_TIMEOUT = 10;
- public static final String DEPLOY = "deploy-";
- public static final String PIPECONF_TOPIC = "-pipeconf";
- public static final String CHECK = "check-";
- public static final String CONNECTION = "-connection";
+
+ // Timeout in seconds for operations on devices.
+ private static final int DEVICE_OP_TIMEOUT = 10;
+
+ private static final String DRIVER = "driver";
+ private static final String DEPLOY = "deploy-";
+ private static final String PIPECONF_TOPIC = "-pipeconf";
+ private static final String CHECK = "check-";
+ private static final String CONNECTION = "-connection";
private static final String POLL_FREQUENCY = "pollFrequency";
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceProviderRegistry providerRegistry;
+ private DeviceProviderRegistry providerRegistry;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ComponentConfigService componentConfigService;
+ private ComponentConfigService componentConfigService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected NetworkConfigRegistry cfgService;
+ private NetworkConfigRegistry cfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CoreService coreService;
+ private CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceService deviceService;
+ private DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DriverService driverService;
+ private DriverService driverService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipService mastershipService;
+ private MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected PiPipeconfService piPipeconfService;
+ private PiPipeconfService piPipeconfService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
+ private ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LeadershipService leadershipService;
+ private LeadershipService leadershipService;
private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
@Property(name = POLL_FREQUENCY, intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
@@ -168,9 +173,9 @@
"default is 10 sec")
private int deviceAvailabilityPollFrequency = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS;
- protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
- protected static final String URI_SCHEME = "device";
- protected static final String CFG_SCHEME = "generalprovider";
+ private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
+ private static final String URI_SCHEME = "device";
+ private static final String CFG_SCHEME = "generalprovider";
private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
private static final int CORE_POOL_SIZE = 10;
private static final String UNKNOWN = "unknown";
@@ -187,25 +192,24 @@
private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
+ private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
- protected ScheduledExecutorService connectionExecutor
- = newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/generaldeviceprovider-device",
- "connection-executor-%d", log));
- protected ScheduledExecutorService portStatsExecutor
- = newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/generaldeviceprovider-port-stats",
- "port-stats-executor-%d", log));
- protected ScheduledExecutorService availabilityCheckExecutor
- = newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/generaldeviceprovider-availability-check",
- "availability-check-executor-%d", log));
- protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
- protected DeviceProviderService providerService;
+ private ExecutorService connectionExecutor
+ = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
+ "onos/generaldeviceprovider-device-connect", "%d", log));
+ private ScheduledExecutorService portStatsExecutor
+ = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+ "onos/generaldeviceprovider-port-stats", "%d", log));
+ private ScheduledExecutorService availabilityCheckExecutor
+ = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+ "onos/generaldeviceprovider-availability-check", "%d", log));
+ private ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
+
+ private DeviceProviderService providerService;
private InternalDeviceListener deviceListener = new InternalDeviceListener();
- protected final ConfigFactory factory =
+ private final ConfigFactory factory =
new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
SubjectFactories.DEVICE_SUBJECT_FACTORY,
GeneralProviderDeviceConfig.class, CFG_SCHEME) {
@@ -215,8 +219,8 @@
}
};
- protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
- private ChannelListener channelListener = new InternalChannelListener();
+ private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
+ private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
@Activate
@@ -233,7 +237,8 @@
cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
.forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
//Initiating a periodic check to see if any device is available again and reconnect it.
- availabilityCheckExecutor.scheduleAtFixedRate(this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
+ availabilityCheckExecutor.scheduleAtFixedRate(
+ this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
deviceAvailabilityPollFrequency, TimeUnit.SECONDS);
modified(context);
log.info("Started");
@@ -255,12 +260,12 @@
Set<DeviceId> deviceSubjects =
cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
deviceSubjects.forEach(deviceId -> {
- if (!compareScheme(deviceId)) {
+ if (notMyScheme(deviceId)) {
// not under my scheme, skipping
log.debug("{} is not my scheme, skipping", deviceId);
return;
}
- scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, true));
+ scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, true));
});
}
}
@@ -299,17 +304,18 @@
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
log.info("Received role {} for device {}", newRole, deviceId);
- CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
- roleReply.thenAcceptAsync(mastership -> {
- providerService.receivedRoleReply(deviceId, newRole, mastership);
- if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) {
- scheduledTasks.get(deviceId).cancel(false);
- scheduledTasks.remove(deviceId);
- } else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
- scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
- updatePortStatistics(deviceId);
- }
- });
+ requestedRoles.put(deviceId, newRole);
+ connectionExecutor.submit(() -> doRoleChanged(deviceId, newRole));
+ }
+
+ private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
+ DeviceHandshaker handshaker = getHandshaker(deviceId);
+ if (handshaker == null) {
+ log.warn("Null handshaker. Unable to notify new role {} to {}",
+ newRole, deviceId);
+ return;
+ }
+ handshaker.roleChanged(newRole);
}
@Override
@@ -321,9 +327,9 @@
return false;
}
- CompletableFuture<Boolean> reachable = handshaker.isReachable();
try {
- return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
+ return handshaker.isReachable()
+ .get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn("Device {} is not reachable {}", deviceId, e.getMessage());
log.debug("Exception", e);
@@ -358,11 +364,8 @@
@Override
public void triggerDisconnect(DeviceId deviceId) {
log.debug("Triggering disconnection of device {}", deviceId);
- connectionExecutor.execute(() -> {
- disconnectDevice(deviceId).whenComplete((success, ex) -> {
- checkAndConnect(deviceId);
- });
- });
+ connectionExecutor.execute(() -> disconnectDevice(deviceId)
+ .thenRunAsync(() -> checkAndConnect(deviceId)));
}
private DeviceHandshaker getHandshaker(DeviceId deviceId) {
@@ -438,7 +441,7 @@
connected.thenAcceptAsync(result -> {
if (result) {
- handshaker.addChannelListener(channelListener);
+ handshaker.addDeviceAgentListener(deviceAgentListener);
//Populated with the default values obtained by the driver
ChassisId cid = new ChassisId();
SparseAnnotations annotations = DefaultAnnotations.builder()
@@ -496,7 +499,7 @@
//Connecting to the device
handshaker.connect().thenAcceptAsync(result -> {
if (result) {
- handshaker.addChannelListener(channelListener);
+ handshaker.addDeviceAgentListener(deviceAgentListener);
handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
}
});
@@ -583,32 +586,29 @@
private CompletableFuture<Boolean> disconnectDevice(DeviceId deviceId) {
log.info("Disconnecting for device {}", deviceId);
- CompletableFuture<Boolean> disconnectError = new CompletableFuture<>();
+ if (scheduledTasks.containsKey(deviceId)) {
+ scheduledTasks.remove(deviceId).cancel(true);
+ }
DeviceHandshaker handshaker = handshakers.remove(deviceId);
- if (handshaker != null) {
- handshaker.disconnect().thenAcceptAsync(result -> {
- if (result) {
- log.info("Disconnected device {}", deviceId);
- providerService.deviceDisconnected(deviceId);
- disconnectError.complete(true);
- } else {
- disconnectError.complete(false);
- log.warn("Device {} was unable to disconnect", deviceId);
- }
- });
- } else {
- //gracefully ignoring.
+
+ if (handshaker == null) {
+ // gracefully ignoring.
log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
- "shutdown of communication", deviceId);
- disconnectError.complete(false);
+ "shutdown of communication", deviceId);
+ return CompletableFuture.completedFuture(false);
}
- ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
- if (pollingStatisticsTask != null) {
- pollingStatisticsTask.cancel(true);
- scheduledTasks.remove(deviceId);
- }
- return disconnectError;
+
+ return handshaker.disconnect()
+ .thenApplyAsync(result -> {
+ if (result) {
+ log.info("Disconnected device {}", deviceId);
+ providerService.deviceDisconnected(deviceId);
+ } else {
+ log.warn("Device {} was unable to disconnect", deviceId);
+ }
+ return result;
+ });
}
//Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -637,8 +637,8 @@
}
}
- private boolean compareScheme(DeviceId deviceId) {
- return deviceId.uri().getScheme().equals(URI_SCHEME);
+ private boolean notMyScheme(DeviceId deviceId) {
+ return !deviceId.uri().getScheme().equals(URI_SCHEME);
}
/**
@@ -650,7 +650,7 @@
public void event(NetworkConfigEvent event) {
DeviceId deviceId = (DeviceId) event.subject();
//Assuming that the deviceId comes with uri 'device:'
- if (!compareScheme(deviceId)) {
+ if (notMyScheme(deviceId)) {
// not under my scheme, skipping
log.debug("{} is not my scheme, skipping", deviceId);
return;
@@ -765,7 +765,7 @@
pipelineConfigured.remove(deviceId);
}
- private ScheduledFuture<?> scheduleStatistcsPolling(DeviceId deviceId, boolean randomize) {
+ private ScheduledFuture<?> scheduleStatsPolling(DeviceId deviceId, boolean randomize) {
int delay = 0;
if (randomize) {
delay = new SecureRandom().nextInt(10);
@@ -826,6 +826,34 @@
return present;
}
+ private void handleChannelClosed(DeviceId deviceId) {
+ disconnectDevice(deviceId).thenRunAsync(() -> {
+ // If master, notifies disconnection to the core.
+ if (mastershipService.isLocalMaster(deviceId)) {
+ log.info("Disconnecting device {}, due to channel closed event",
+ deviceId);
+ providerService.deviceDisconnected(deviceId);
+ }
+ });
+ }
+
+ private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
+ //Notify core about response.
+ if (!requestedRoles.containsKey(deviceId)) {
+ return;
+ }
+ providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
+ // If not master, cancel polling tasks, otherwise start them.
+ if (!response.equals(MastershipRole.MASTER)
+ && scheduledTasks.get(deviceId) != null) {
+ scheduledTasks.remove(deviceId).cancel(false);
+ } else if (response.equals(MastershipRole.MASTER)
+ && scheduledTasks.get(deviceId) == null) {
+ scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
+ updatePortStatistics(deviceId);
+ }
+ }
+
/**
* Listener for core device events.
*/
@@ -834,11 +862,10 @@
public void event(DeviceEvent event) {
DeviceId deviceId = event.subject().id();
// FIXME handling for mastership change scenario missing?
-
// For now this is scheduled periodically, when streaming API will
// be available we check and base it on the streaming API (e.g. gNMI)
if (mastershipService.isLocalMaster(deviceId)) {
- scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
+ scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
}
}
@@ -850,26 +877,37 @@
}
/**
- * Listener for device channel events.
+ * Listener for device agent events.
*/
- private class InternalChannelListener implements ChannelListener {
+ private class InternalDeviceAgentListener implements DeviceAgentListener {
@Override
- public void event(ChannelEvent event) {
+ public void event(DeviceAgentEvent event) {
DeviceId deviceId = event.subject();
- if (event.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
- //let's properly handle device disconnection
- CompletableFuture<Boolean> disconnection = disconnectDevice(deviceId);
- disconnection.thenAcceptAsync(result -> {
- //If master notifying of disconnection to the core.
- if (mastershipService.isLocalMaster(deviceId)) {
- log.info("Disconnecting unreachable device {}, due to error on channel", deviceId);
- providerService.deviceDisconnected(deviceId);
- }
- });
-
+ switch (event.type()) {
+ case CHANNEL_OPEN:
+ // Ignore.
+ break;
+ case CHANNEL_CLOSED:
+ handleChannelClosed(deviceId);
+ break;
+ case CHANNEL_ERROR:
+ // TODO evaluate other reaction to channel error.
+ log.warn("Received CHANNEL_ERROR from {}. Is the channel still open?",
+ deviceId);
+ break;
+ case ROLE_MASTER:
+ handleMastershipResponse(deviceId, MastershipRole.MASTER);
+ break;
+ case ROLE_STANDBY:
+ handleMastershipResponse(deviceId, MastershipRole.STANDBY);
+ break;
+ case ROLE_NONE:
+ handleMastershipResponse(deviceId, MastershipRole.NONE);
+ break;
+ default:
+ log.warn("Unrecognized device agent event {}", event.type());
}
- //TODO evaluate other type of reactions.
}
}