Refactor channel and mastership handling in P4Runtime
This (big) change aims at solving the issue observed with mastership flapping
and device connection/disconnection with P4Runtime.
Channel handling is now based on the underlying gRPC channel state. Before,
channel events (open/close/error) were generated as a consequence of P4Runtime
StreamChannel events, making device availability dependent on mastership. Now
Stream Channel events only affect mastership (MASTER/STANDBY or NONE when the
SteamChannel RPC is not active).
Mastership handling has been refactored to generate P4Runtime election IDs that
are compatible with the mastership preference decided by the MastershipService.
GeneralDeviceProvider has been re-implemented to support in-order
device event processing and to reduce implementation complexity. Stats polling
has been moved to a separate component, and netcfg handling updated to only
depend on BasicDeviceConfig, augmented with a pipeconf field, and re-using the
managementAddress field to set the gRPC server endpoints (e.g.
grpc://myswitch.local:50051). Before it was depending on 3 different config
classes, making hard to detect changes.
Finally, this change affects some core interfaces:
- Adds a method to DeviceProvider and DeviceHandshaker to check for device
availability, making the meaning of availability device-specific. This is needed
in cases where the device manager needs to change the availability state of a
device (as in change #20842)
- Support device providers not capable of reconciling mastership role responses
with requests (like P4Runtime).
- Clarify the meaning of "connection" in the DeviceConnect behavior.
- Allows driver-based providers to check devices for reachability and
availability without probing the device via the network.
Change-Id: I7ff30d29f5d02ad938e3171536e54ae2916629a2
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
index 6e65dd3..796ef55 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
@@ -68,12 +68,4 @@
* Terminates the subscription channel of this device.
*/
void terminateSubscriptionChannel();
-
- /**
- * Check weather the gNMI service is available or not by sending a dummy get
- * request message.
- *
- * @return true if gNMI service available; false otherwise
- */
- CompletableFuture<Boolean> isServiceAvailable();
}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java
index 137cbb8..3a66c18 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java
@@ -20,6 +20,8 @@
import org.onosproject.grpc.api.GrpcClientKey;
import org.onosproject.net.DeviceId;
+import java.net.URI;
+
/**
* Key that uniquely identifies a gNMI client.
*/
@@ -31,11 +33,10 @@
/**
* Creates a new gNMI client key.
*
- * @param deviceId ONOS device ID
- * @param serverAddr gNMI server address
- * @param serverPort gNMI server port
+ * @param deviceId ONOS device ID
+ * @param serverUri gNMI server URI
*/
- public GnmiClientKey(DeviceId deviceId, String serverAddr, int serverPort) {
- super(GNMI, deviceId, serverAddr, serverPort);
+ public GnmiClientKey(DeviceId deviceId, URI serverUri) {
+ super(GNMI, deviceId, serverUri);
}
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index 3701ff7..6645f39 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -50,7 +50,7 @@
private GnmiSubscriptionManager gnmiSubscriptionManager;
GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
- super(clientKey);
+ super(clientKey, managedChannel, false, controller);
this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
this.gnmiSubscriptionManager =
new GnmiSubscriptionManager(managedChannel, deviceId, controller);
@@ -82,7 +82,7 @@
}
@Override
- public CompletableFuture<Boolean> isServiceAvailable() {
+ public CompletableFuture<Boolean> probeService() {
return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
index c0bdcdd..9c4643a 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
@@ -23,47 +23,24 @@
import org.onosproject.gnmi.api.GnmiEvent;
import org.onosproject.gnmi.api.GnmiEventListener;
import org.onosproject.grpc.ctl.AbstractGrpcClientController;
-import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.slf4j.Logger;
-
-import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of gNMI controller.
*/
@Component(immediate = true, service = GnmiController.class)
public class GnmiControllerImpl
- extends AbstractGrpcClientController<GnmiClientKey, GnmiClient, GnmiEvent, GnmiEventListener>
+ extends AbstractGrpcClientController
+ <GnmiClientKey, GnmiClient, GnmiEvent, GnmiEventListener>
implements GnmiController {
- private final Logger log = getLogger(getClass());
-
- @Activate
- public void activate() {
- super.activate();
- eventDispatcher.addSink(GnmiEvent.class, listenerRegistry);
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- super.deactivate();
- log.info("Stopped");
+ public GnmiControllerImpl() {
+ super(GnmiEvent.class);
}
@Override
- protected GnmiClient createClientInstance(GnmiClientKey clientKey, ManagedChannel channel) {
+ protected GnmiClient createClientInstance(
+ GnmiClientKey clientKey, ManagedChannel channel) {
return new GnmiClientImpl(clientKey, channel, this);
}
-
- /**
- * Handles event from gNMI client.
- *
- * @param event the gNMI event
- */
- void postEvent(GnmiEvent event) {
- post(event);
- }
}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
index 5b07254..ad4324b 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
/**
* Abstraction of a gRPC controller that stores and manages gRPC channels.
@@ -33,10 +34,10 @@
int CONNECTION_TIMEOUT_SECONDS = 20;
/**
- * Creates a gRPC managed channel from the given builder and opens a
- * connection to it. If the connection is successful returns the managed
- * channel object and stores the channel internally, associated with the
- * given channel ID.
+ * Creates a gRPC managed channel from the given builder and opens the
+ * connection. If the connection is successful, returns the managed channel
+ * object and stores the channel internally, associated with the given
+ * channel ID.
* <p>
* This method blocks until the channel is open or a timeout expires. By
* default the timeout is {@link #CONNECTION_TIMEOUT_SECONDS} seconds. If
@@ -72,15 +73,6 @@
Map<GrpcChannelId, ManagedChannel> getChannels();
/**
- * Returns true if the channel associated with the given identifier is open,
- * i.e. the server is able to successfully reply to RPCs, false otherwise.
- *
- * @param channelId channel ID
- * @return true if channel is open, false otherwise.
- */
- boolean isChannelOpen(GrpcChannelId channelId);
-
- /**
* If present, returns the channel associated with the given ID.
*
* @param channelId channel ID
@@ -88,4 +80,14 @@
*/
Optional<ManagedChannel> getChannel(GrpcChannelId channelId);
+ /**
+ * Probes the server at the endpoint of the given channel. Returns true if
+ * the server responded to the probe, false otherwise or if the channel does
+ * not exist.
+ *
+ * @param channelId channel ID
+ * @return completable future eventually true if the gRPC server responded
+ * to the probe; false otherwise
+ */
+ CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId);
}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
index d040a23..d529529 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
@@ -17,11 +17,11 @@
package org.onosproject.grpc.api;
import com.google.common.annotations.Beta;
+
import java.util.concurrent.CompletableFuture;
/**
* Abstraction of a gRPC client.
- *
*/
@Beta
public interface GrpcClient {
@@ -33,4 +33,30 @@
* procedure
*/
CompletableFuture<Void> shutdown();
+
+ /**
+ * This method provides a coarse modelling of gRPC channel {@link
+ * io.grpc.ConnectivityState}. The implementation does not make any attempt
+ * at probing the server by sending messages over the network, as such it
+ * does not block execution. It returns true if this client is expected to
+ * communicate with the server successfully. In other words, if any RPC
+ * would be executed immediately after this method is called and returns
+ * true, then it is expected, but NOT guaranteed, for that RPC message to
+ * reach the server and be processed. If false, it means the channel is in a
+ * failure state and communication with the server is unlikely to happen
+ * soon.
+ *
+ * @return true if server is deemed reachable, false otherwise
+ */
+ boolean isServerReachable();
+
+ /**
+ * Similar to {@link #isServerReachable()}, but might involve sending
+ * packets over the network. This checks whether the specific gRPC
+ * service(s) required by this client is available or not on the server.
+ *
+ * @return completable future eventually true if the gRPC service(s) on the
+ * server was available during the probe; false otherwise
+ */
+ CompletableFuture<Boolean> probeService();
}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
index 3cdbcd1..b47d499 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
@@ -18,10 +18,11 @@
import com.google.common.annotations.Beta;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
/**
- * Abstraction of a gRPC controller which controls specific gRPC client {@link
- * C} with specific client key {@link K}.
+ * Abstraction of controller that manages gRPC clients.
*
* @param <K> the gRPC client key
* @param <C> the gRPC client type
@@ -34,20 +35,27 @@
* given information. As a result of this method, a client can be later
* obtained by invoking {@link #getClient(DeviceId)}.
* <p>
+ * Upon creation, a connection to the server is automatically started, which
+ * blocks execution. If the connection is successful, the client is created
+ * and this method returns true, otherwise (e.g., socket error) any state
+ * associated with this client is destroyed and returns false.
+ * <p>
* Only one client can exist for the same device ID. Calls to this method
* are idempotent fot the same client key, i.e. returns true if such client
- * already exists but a new one is not created. If there exists a client
- * with same device ID but different address and port, removes old one and
- * recreate new one.
+ * already exists. Otherwise, if a client for the same device ID but
+ * different client key already exists, throws an exception.
*
* @param clientKey the client key
* @return true if the client was created and the channel to the server is
* open; false otherwise
+ * @throws IllegalArgumentException if a client for the same device ID but
+ * different client key already exists.
*/
boolean createClient(K clientKey);
/**
- * Retrieves the gRPC client to operate on the given device.
+ * Returns the gRPC client previously created for the given device, or null
+ * if such client does not exist.
*
* @param deviceId the device identifier
* @return the gRPC client of the device if exists; null otherwise
@@ -55,24 +63,47 @@
C getClient(DeviceId deviceId);
/**
- * Removes the gRPC client for the given device. If no client exists for the
- * given device, the result is a no-op.
+ * Returns the gRPC client previously created for the given client key, or
+ * null if such client does not exist.
+ *
+ * @param clientKey client key
+ * @return the gRPC client of the device if exists; null otherwise
+ */
+ C getClient(K clientKey);
+
+ /**
+ * Removes the gRPC client for the given device and any gRPC channel state
+ * associated to it. If no client exists for the given device, the result is
+ * a no-op.
*
* @param deviceId the device identifier
*/
void removeClient(DeviceId deviceId);
/**
- * Check reachability of the gRPC server running on the given device.
- * Reachability can be tested only if a client is previously created using
- * {@link #createClient(GrpcClientKey)}. Note that this only checks the
- * reachability instead of checking service availability, different
- * service-specific gRPC clients might check service availability in a
- * different way.
+ * Similar to {@link #removeClient(DeviceId)} but uses the client key to
+ * identify the client to remove.
*
- * @param deviceId the device identifier
- * @return true if client was created and is able to contact the gNMI
- * server; false otherwise
+ * @param clientKey the client key
*/
- boolean isReachable(DeviceId deviceId);
+ void removeClient(K clientKey);
+
+ /**
+ * Adds a listener for device agent events for the given provider.
+ *
+ * @param deviceId device identifier
+ * @param providerId provider ID
+ * @param listener the device agent listener
+ */
+ void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId,
+ DeviceAgentListener listener);
+
+ /**
+ * Removes the listener for device agent events that was previously
+ * registered for the given provider.
+ *
+ * @param deviceId device identifier
+ * @param providerId the provider ID
+ */
+ void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId);
}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
index 99ea23f..d1d0b0f 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
@@ -20,8 +20,11 @@
import com.google.common.base.Objects;
import org.onosproject.net.DeviceId;
+import java.net.URI;
+
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
/**
@@ -29,32 +32,36 @@
*/
@Beta
public class GrpcClientKey {
+
+ private static final String GRPC = "grpc";
+ private static final String GRPCS = "grpcs";
+
private final String serviceName;
private final DeviceId deviceId;
- private final String serverAddr;
- private final int serverPort;
+ private final URI serverUri;
/**
* Creates a new client key.
*
* @param serviceName gRPC service name of the client
- * @param deviceId ONOS device ID
- * @param serverAddr gRPC server address
- * @param serverPort gRPC server port
+ * @param deviceId ONOS device ID
+ * @param serverUri gRPC server URI
*/
- public GrpcClientKey(String serviceName, DeviceId deviceId, String serverAddr, int serverPort) {
+ public GrpcClientKey(String serviceName, DeviceId deviceId, URI serverUri) {
checkNotNull(serviceName);
checkNotNull(deviceId);
- checkNotNull(serverAddr);
+ checkNotNull(serverUri);
checkArgument(!serviceName.isEmpty(),
- "Service name can not be null");
- checkArgument(!serverAddr.isEmpty(),
- "Server address should not be empty");
- checkArgument(serverPort > 0 && serverPort <= 65535, "Invalid server port");
+ "Service name can not be null");
+ checkArgument(serverUri.getScheme().equals(GRPC)
+ || serverUri.getScheme().equals(GRPCS),
+ format("Server URI scheme must be %s or %s", GRPC, GRPCS));
+ checkArgument(!isNullOrEmpty(serverUri.getHost()),
+ "Server host address should not be empty");
+ checkArgument(serverUri.getPort() > 0 && serverUri.getPort() <= 65535, "Invalid server port");
this.serviceName = serviceName;
this.deviceId = deviceId;
- this.serverAddr = serverAddr;
- this.serverPort = serverPort;
+ this.serverUri = serverUri;
}
/**
@@ -76,21 +83,21 @@
}
/**
- * Gets the gRPC server address.
+ * Returns the gRPC server URI.
*
- * @return the gRPC server address.
+ * @return the gRPC server URI.
*/
- public String serverAddr() {
- return serverAddr;
+ public URI serveUri() {
+ return serverUri;
}
/**
- * Gets the gRPC server port.
+ * Returns true if the client requires TLS/SSL, false otherwise.
*
- * @return the gRPC server port.
+ * @return boolean
*/
- public int serverPort() {
- return serverPort;
+ public boolean requiresSecureChannel() {
+ return serverUri.getScheme().equals(GRPCS);
}
@Override
@@ -102,19 +109,18 @@
return false;
}
GrpcClientKey that = (GrpcClientKey) o;
- return serverPort == that.serverPort &&
- Objects.equal(serviceName, that.serviceName) &&
+ return Objects.equal(serviceName, that.serviceName) &&
Objects.equal(deviceId, that.deviceId) &&
- Objects.equal(serverAddr, that.serverAddr);
+ Objects.equal(serverUri, that.serverUri);
}
@Override
public int hashCode() {
- return Objects.hashCode(serviceName, deviceId, serverAddr, serverPort);
+ return Objects.hashCode(serviceName, deviceId, serverUri);
}
@Override
public String toString() {
- return format("%s/%s@%s:%s", deviceId, serviceName, serverAddr, serverPort);
+ return format("%s/%s@%s", deviceId, serviceName, serverUri);
}
}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index 0b21ca2..117c6e3 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -16,11 +16,14 @@
package org.onosproject.grpc.ctl;
+import io.grpc.ConnectivityState;
import io.grpc.Context;
+import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import org.onosproject.grpc.api.GrpcClient;
import org.onosproject.grpc.api.GrpcClientKey;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
import org.slf4j.Logger;
import java.util.concurrent.CompletableFuture;
@@ -28,6 +31,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
@@ -54,13 +58,51 @@
protected final ExecutorService executorService;
protected final DeviceId deviceId;
+ protected final ManagedChannel channel;
+ private final boolean persistent;
+ private final AbstractGrpcClientController controller;
+ private final AtomicBoolean channelOpen = new AtomicBoolean(false);
- protected AbstractGrpcClient(GrpcClientKey clientKey) {
+ /**
+ * Creates an new client for the given key and channel. Setting persistent
+ * to true avoids the gRPC channel to stay IDLE. The controller instance is
+ * needed to propagate channel events.
+ *
+ * @param clientKey client key
+ * @param channel channel
+ * @param persistent true if the gRPC should never stay IDLE
+ * @param controller controller
+ */
+ protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel,
+ boolean persistent, AbstractGrpcClientController controller) {
checkNotNull(clientKey);
+ checkNotNull(channel);
this.deviceId = clientKey.deviceId();
+ this.channel = channel;
+ this.persistent = persistent;
+ this.controller = controller;
this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
"onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+
+ setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
+ }
+
+ @Override
+ public boolean isServerReachable() {
+ final ConnectivityState state = channel.getState(false);
+ switch (state) {
+ case READY:
+ case IDLE:
+ return true;
+ case CONNECTING:
+ case TRANSIENT_FAILURE:
+ case SHUTDOWN:
+ return false;
+ default:
+ log.error("Unrecognized channel connectivity state {}", state);
+ return false;
+ }
}
@Override
@@ -171,4 +213,85 @@
}
}, executor);
}
+
+ private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
+ ConnectivityState sourceState) {
+ if (log.isTraceEnabled()) {
+ log.trace("Setting channel callback for {} with source state {}...",
+ deviceId, sourceState);
+ }
+ channel.notifyWhenStateChanged(
+ sourceState, new ChannelConnectivityCallback(deviceId, channel));
+ }
+
+ /**
+ * Runnable task invoked at each change of the channel connectivity state.
+ * New callbacks are created as long as the channel is not shut down.
+ */
+ private final class ChannelConnectivityCallback implements Runnable {
+
+ private final DeviceId deviceId;
+ private final ManagedChannel channel;
+
+ private ChannelConnectivityCallback(
+ DeviceId deviceId, ManagedChannel channel) {
+ this.deviceId = deviceId;
+ this.channel = channel;
+ }
+
+ @Override
+ public void run() {
+ final ConnectivityState newState = channel.getState(false);
+ final DeviceAgentEvent.Type eventType;
+ switch (newState) {
+ // On gRPC connectivity states:
+ // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
+ case READY:
+ eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+ break;
+ case TRANSIENT_FAILURE:
+ eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
+ break;
+ case SHUTDOWN:
+ eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+ break;
+ case IDLE:
+ // IDLE and CONNECTING are transient states that will
+ // eventually move to READY or TRANSIENT_FAILURE. Do not
+ // generate an event for now.
+ if (persistent) {
+ log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
+ channel.getState(true);
+ }
+ eventType = null;
+ break;
+ case CONNECTING:
+ eventType = null;
+ break;
+ default:
+ log.error("Unrecognized connectivity state {}", newState);
+ eventType = null;
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Detected channel connectivity change for {}, new state is {}",
+ deviceId, newState);
+ }
+
+ if (eventType != null) {
+ // Avoid sending consecutive duplicate events.
+ final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
+ final boolean past = channelOpen.getAndSet(present);
+ if (present != past) {
+ log.debug("Notifying event {} for {}", eventType, deviceId);
+ controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
+ }
+ }
+
+ if (newState != ConnectivityState.SHUTDOWN) {
+ // Channels never leave SHUTDOWN state, no need for a new callback.
+ setChannelCallback(deviceId, channel, newState);
+ }
+ }
+ }
}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
index c0a3c6b..3c38209 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -21,7 +21,6 @@
import io.grpc.ManagedChannel;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
-import io.netty.handler.ssl.NotSslRecordException;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.onosproject.event.AbstractListenerManager;
@@ -33,6 +32,9 @@
import org.onosproject.grpc.api.GrpcClientController;
import org.onosproject.grpc.api.GrpcClientKey;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
@@ -41,11 +43,11 @@
import javax.net.ssl.SSLException;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
@@ -74,40 +76,44 @@
private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
private final Map<K, C> clients = Maps.newHashMap();
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+ private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
+ deviceAgentListeners = Maps.newConcurrentMap();
+ private final Class<E> eventClass;
private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected GrpcChannelController grpcChannelController;
+ public AbstractGrpcClientController(Class<E> eventClass) {
+ this.eventClass = eventClass;
+ }
+
@Activate
public void activate() {
+ eventDispatcher.addSink(eventClass, listenerRegistry);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ eventDispatcher.removeSink(eventClass);
clientKeys.keySet().forEach(this::removeClient);
clientKeys.clear();
clients.clear();
channelIds.clear();
+ deviceAgentListeners.clear();
log.info("Stopped");
}
@Override
public boolean createClient(K clientKey) {
checkNotNull(clientKey);
- /*
- FIXME we might want to move "useTls" and "fallback" to properties of the netcfg and clientKey
- For now, we will first try to connect with TLS (accepting any cert), then fall back to
- plaintext for every device
- */
- return withDeviceLock(() -> doCreateClient(clientKey, true, true), clientKey.deviceId());
+ return withDeviceLock(() -> doCreateClient(clientKey),
+ clientKey.deviceId());
}
- private boolean doCreateClient(K clientKey, boolean useTls, boolean fallbackToPlainText) {
+ private boolean doCreateClient(K clientKey) {
DeviceId deviceId = clientKey.deviceId();
- String serverAddr = clientKey.serverAddr();
- int serverPort = clientKey.serverPort();
if (clientKeys.containsKey(deviceId)) {
final GrpcClientKey existingKey = clientKeys.get(deviceId);
@@ -116,19 +122,25 @@
clientName(clientKey), clientKey);
return true;
} else {
- log.info("Requested new {} with updated key, removing old client... (oldKey={})",
- clientName(clientKey), existingKey);
- doRemoveClient(deviceId);
+ throw new IllegalArgumentException(format(
+ "A client already exists for device %s (%s)",
+ deviceId, clientKey));
}
}
- log.info("Creating client for {} (server={}:{})...", deviceId, serverAddr, serverPort);
+ final String method = clientKey.requiresSecureChannel()
+ ? "TLS" : "plaintext TCP";
+
+ log.info("Connecting {} client for {} to server at {} using {}...",
+ clientKey.serviceName(), deviceId, clientKey.serveUri(), method);
SslContext sslContext = null;
- if (useTls) {
+ if (clientKey.requiresSecureChannel()) {
try {
- // Accept any server certificate; this is insecure and should not be used in production
- sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
+ // FIXME: Accept any server certificate; this is insecure and
+ // should not be used in production
+ sslContext = GrpcSslContexts.forClient().trustManager(
+ InsecureTrustManagerFactory.INSTANCE).build();
} catch (SSLException e) {
log.error("Failed to build SSL Context", e);
return false;
@@ -137,17 +149,15 @@
GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString());
NettyChannelBuilder channelBuilder = NettyChannelBuilder
- .forAddress(serverAddr, serverPort)
+ .forAddress(clientKey.serveUri().getHost(),
+ clientKey.serveUri().getPort())
.maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
+
if (sslContext != null) {
- log.debug("Using SSL for gRPC connection to {}", deviceId);
channelBuilder
.sslContext(sslContext)
.useTransportSecurity();
} else {
- checkState(!useTls,
- "Not authorized to use plaintext for gRPC connection to {}", deviceId);
- log.debug("Using plaintext TCP for gRPC connection to {}", deviceId);
channelBuilder.usePlaintext();
}
@@ -156,24 +166,9 @@
try {
channel = grpcChannelController.connectChannel(channelId, channelBuilder);
} catch (Throwable e) {
- for (Throwable cause = e; cause != null; cause = cause.getCause()) {
- if (useTls && cause instanceof NotSslRecordException) {
- // Likely root cause is that server is using plaintext
- log.info("Failed to connect to server (device={}) using TLS", deviceId);
- log.debug("TLS connection exception", e);
- if (fallbackToPlainText) {
- log.info("Falling back to plaintext for connection to {}", deviceId);
- return doCreateClient(clientKey, false, false);
- }
- }
- if (!useTls && "Connection reset by peer".equals(cause.getMessage())) {
- // Not a great signal, but could indicate the server is expected a TLS connection
- log.error("Failed to connect to server (device={}) using plaintext TCP; is the server using TLS?",
- deviceId);
- break;
- }
- }
- log.warn("Unable to connect to gRPC server for {}", deviceId, e);
+ log.warn("Failed to connect to {} ({}) using {}: {}",
+ deviceId, clientKey.serveUri(), method, e.toString());
+ log.debug("gRPC client connection exception", e);
return false;
}
@@ -216,37 +211,79 @@
}
@Override
+ public C getClient(K clientKey) {
+ checkNotNull(clientKey);
+ return clients.get(clientKey);
+ }
+
+ @Override
public void removeClient(DeviceId deviceId) {
checkNotNull(deviceId);
withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
}
+ @Override
+ public void removeClient(K clientKey) {
+ checkNotNull(clientKey);
+ withDeviceLock(() -> doRemoveClient(clientKey), clientKey.deviceId());
+ }
+
private Void doRemoveClient(DeviceId deviceId) {
if (clientKeys.containsKey(deviceId)) {
- final K clientKey = clientKeys.get(deviceId);
- clients.get(clientKey).shutdown();
- grpcChannelController.disconnectChannel(channelIds.get(deviceId));
- clientKeys.remove(deviceId);
- clients.remove(clientKey);
- channelIds.remove(deviceId);
+ doRemoveClient(clientKeys.get(deviceId));
}
return null;
}
- @Override
- public boolean isReachable(DeviceId deviceId) {
- checkNotNull(deviceId);
- return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
+ private Void doRemoveClient(K clientKey) {
+ if (clients.containsKey(clientKey)) {
+ clients.get(clientKey).shutdown();
+ }
+ if (channelIds.containsKey(clientKey.deviceId())) {
+ grpcChannelController.disconnectChannel(
+ channelIds.get(clientKey.deviceId()));
+ }
+ clientKeys.remove(clientKey.deviceId());
+ clients.remove(clientKey);
+ channelIds.remove(clientKey.deviceId());
+ return null;
}
- private boolean doIsReachable(DeviceId deviceId) {
- // Default behaviour checks only the gRPC channel, should
- // check according to different gRPC service
- if (!clientKeys.containsKey(deviceId)) {
- log.debug("Missing client for {}, cannot check for reachability", deviceId);
- return false;
+ @Override
+ public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
+ checkNotNull(deviceId, "deviceId cannot be null");
+ checkNotNull(deviceId, "providerId cannot be null");
+ checkNotNull(listener, "listener cannot be null");
+ deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
+ deviceAgentListeners.get(deviceId).put(providerId, listener);
+ }
+
+ @Override
+ public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
+ checkNotNull(deviceId, "deviceId cannot be null");
+ checkNotNull(providerId, "listener cannot be null");
+ deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
+ listeners.remove(providerId);
+ return listeners.isEmpty() ? null : listeners;
+ });
+ }
+
+ public void postEvent(E event) {
+ checkNotNull(event);
+ post(event);
+ }
+
+ public void postEvent(DeviceAgentEvent event) {
+ // We should have only one event delivery mechanism. We have two now
+ // because we have two different types of events, DeviceAgentEvent and
+ // controller/protocol specific ones (e.g. P4Runtime or gNMI).
+ // TODO: extend device agent event to allow delivery protocol-specific
+ // events, e.g. packet-in
+ checkNotNull(event);
+ if (deviceAgentListeners.containsKey(event.subject())) {
+ deviceAgentListeners.get(event.subject()).values()
+ .forEach(l -> l.event(event));
}
- return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
}
private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index ca0c8c3..87c9426 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -41,6 +41,7 @@
import java.util.Dictionary;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -152,50 +153,24 @@
}
}
- private boolean doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
+ private void doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
.newBlockingStub(channel)
.withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
try {
- return dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
- .getDefaultInstance()) != null;
+ //noinspection ResultOfMethodCallIgnored
+ dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
+ .getDefaultInstance());
} catch (StatusRuntimeException e) {
- if (e.getStatus().equals(Status.UNIMPLEMENTED)) {
+ if (!e.getStatus().equals(Status.UNIMPLEMENTED)) {
// UNIMPLEMENTED means that the server received our message but
// doesn't know how to handle it. Hence, channel is open.
- return true;
- } else {
throw e;
}
}
}
@Override
- public boolean isChannelOpen(GrpcChannelId channelId) {
- checkNotNull(channelId);
-
- Lock lock = channelLocks.get(channelId);
- lock.lock();
-
- try {
- if (!channels.containsKey(channelId)) {
- log.warn("Unknown channel ID '{}', can't check if channel is open",
- channelId);
- return false;
- }
- try {
- return doDummyMessage(channels.get(channelId));
- } catch (StatusRuntimeException e) {
- log.debug("Unable to send dummy message to {}: {}",
- channelId, e.toString());
- return false;
- }
- } finally {
- lock.unlock();
- }
- }
-
- @Override
public void disconnectChannel(GrpcChannelId channelId) {
checkNotNull(channelId);
@@ -240,7 +215,6 @@
Lock lock = channelLocks.get(channelId);
lock.lock();
-
try {
return Optional.ofNullable(channels.get(channelId));
} finally {
@@ -248,4 +222,23 @@
}
}
+ @Override
+ public CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId) {
+ final ManagedChannel channel = channels.get(channelId);
+ if (channel == null) {
+ log.warn("Unable to find any channel with ID {}, cannot send probe",
+ channelId);
+ return CompletableFuture.completedFuture(false);
+ }
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ doDummyMessage(channel);
+ return true;
+ } catch (StatusRuntimeException e) {
+ log.debug("Probe for {} failed", channelId);
+ log.debug("", e);
+ return false;
+ }
+ });
+ }
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
index 4c23bbc..452e978 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
@@ -20,28 +20,58 @@
import org.onosproject.grpc.api.GrpcClientKey;
import org.onosproject.net.DeviceId;
-import java.util.Objects;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
+
+import static java.lang.String.format;
/**
* Key that uniquely identifies a P4Runtime client.
*/
@Beta
public final class P4RuntimeClientKey extends GrpcClientKey {
+
+ private static final String DEVICE_ID_PARAM = "device_id=";
+
private static final String P4RUNTIME = "P4Runtime";
private final long p4DeviceId;
/**
- * Creates a new client key.
+ * Creates a new client key. The server URI is expected to carry the
+ * P4runtime server-internal 'device_id' as a param in the query string. For
+ * example, grpc://10.0.0.1:5001?device_id=1
*
- * @param deviceId ONOS device ID
- * @param serverAddr P4Runtime server address
- * @param serverPort P4Runtime server port
- * @param p4DeviceId P4Runtime server-internal device ID
+ * @param deviceId ONOS device ID
+ * @param serverUri P4Runtime server URI
*/
- public P4RuntimeClientKey(DeviceId deviceId, String serverAddr,
- int serverPort, long p4DeviceId) {
- super(P4RUNTIME, deviceId, serverAddr, serverPort);
- this.p4DeviceId = p4DeviceId;
+ public P4RuntimeClientKey(DeviceId deviceId, URI serverUri) {
+ super(P4RUNTIME, deviceId, serverUri);
+ this.p4DeviceId = extractP4DeviceId(serverUri);
+ }
+
+ private static Long extractP4DeviceId(URI uri) {
+ String[] segments = uri.getRawQuery().split("&");
+ try {
+ for (String s : segments) {
+ if (s.startsWith(DEVICE_ID_PARAM)) {
+ return Long.parseUnsignedLong(
+ URLDecoder.decode(
+ s.substring(DEVICE_ID_PARAM.length()), "utf-8"));
+ }
+ }
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(format(
+ "Unable to decode P4Runtime-interval device_id from URI %s: %s",
+ uri, e.toString()));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(format(
+ "Invalid P4Runtime-interval device_id in URI %s: %s",
+ uri, e.toString()));
+ }
+ throw new IllegalArgumentException(format(
+ "Missing P4Runtime-interval device_id in URI %s",
+ uri));
}
/**
@@ -52,29 +82,4 @@
public long p4DeviceId() {
return p4DeviceId;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- P4RuntimeClientKey that = (P4RuntimeClientKey) o;
- return p4DeviceId == that.p4DeviceId;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), p4DeviceId);
- }
-
- @Override
- public String toString() {
- return super.toString() + "/" + p4DeviceId;
- }
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 0bc8ed6..18e925a 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -19,33 +19,13 @@
import com.google.common.annotations.Beta;
import org.onosproject.event.ListenerService;
import org.onosproject.grpc.api.GrpcClientController;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceAgentListener;
-import org.onosproject.net.provider.ProviderId;
/**
- * Controller of P4Runtime devices.
+ * Controller of P4Runtime clients.
*/
@Beta
public interface P4RuntimeController
extends GrpcClientController<P4RuntimeClientKey, P4RuntimeClient>,
ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
- /**
- * Adds a listener for device agent events for the given provider.
- *
- * @param deviceId device identifier
- * @param providerId provider ID
- * @param listener the device agent listener
- */
- void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId,
- DeviceAgentListener listener);
- /**
- * Removes the listener for device agent events that was previously
- * registered for the given provider.
- *
- * @param deviceId device identifier
- * @param providerId the provider ID
- */
- void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId);
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
index 4e0ea60b..2b3db65 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
@@ -33,21 +33,6 @@
* A packet-in.
*/
PACKET_IN,
-
- /**
- * Arbitration reply.
- */
- ARBITRATION_RESPONSE,
-
- /**
- * Channel Event.
- */
- CHANNEL_EVENT,
-
- /**
- * Permission denied (not master).
- */
- PERMISSION_DENIED,
}
public P4RuntimeEvent(Type type, P4RuntimeEventSubject subject) {
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
index 34dac66..81928c1 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
@@ -89,4 +89,25 @@
PiPipeconf pipeconf, ByteBuffer deviceData) {
return Futures.getUnchecked(isPipelineConfigSet(pipeconf, deviceData));
}
+
+ /**
+ * Returns true if the device has a pipeline config set, false otherwise.
+ * <p>
+ * This method is expected to return {@code true} if invoked after
+ * successfully calling {@link #setPipelineConfig(PiPipeconf, ByteBuffer)}
+ * with any parameter.
+ *
+ * @return completable future, true if the device has a pipeline config set,
+ * false otherwise.
+ */
+ CompletableFuture<Boolean> isAnyPipelineConfigSet();
+
+ /**
+ * Same as {@link #isAnyPipelineConfigSet()}, but blocks execution.
+ *
+ * @return true if the device has a pipeline config set, false otherwise.
+ */
+ default boolean isAnyPipelineConfigSetSync() {
+ return Futures.getUnchecked(isAnyPipelineConfigSet());
+ }
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
index 1317e29..555f906 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
@@ -19,6 +19,8 @@
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.runtime.PiPacketOperation;
+import java.math.BigInteger;
+
/**
* P4Runtime client interface for the StreamChannel RPC. It allows management of
* the P4Runtime session (open/close, mastership arbitration) as well as sending
@@ -30,23 +32,26 @@
public interface P4RuntimeStreamClient {
/**
- * Opens a session to the server by starting the Stream RPC and sending a
- * mastership arbitration update message with an election ID that is
- * expected to be unique among all available clients. If a client has been
- * requested to become master via {@link #runForMastership()}, then this
- * method should pick an election ID that is lower than the one currently
- * associated with the master client.
+ * Opportunistically opens a session with the server by starting a
+ * StreamChannel RPC and sends a {@code MasterArbitrationUpdate} message
+ * with the given election ID. The {@code master} boolean flag is used to
+ * indicated if we are trying to became master or not. If false, the
+ * implementation might delay sending the {@code MasterArbitrationUpdate}
+ * message until another node becomes master with a higher election ID.
* <p>
- * If the server acknowledges the session to this client as open, the {@link
+ * If the server acknowledges this client as master, the {@link
* P4RuntimeController} is expected to generate a {@link
* org.onosproject.net.device.DeviceAgentEvent} with type {@link
- * org.onosproject.net.device.DeviceAgentEvent.Type#CHANNEL_OPEN}.
+ * org.onosproject.net.device.DeviceAgentEvent.Type#ROLE_MASTER}.
+ *
+ * @param master true if we are trying to become master
+ * @param electionId election ID
*/
- void openSession();
+ void setMastership(boolean master, BigInteger electionId);
/**
- * Returns true if the Stream RPC is active and the P4Runtime session is
- * open, false otherwise.
+ * Returns true if the StreamChannel RPC is active and hence the P4Runtime
+ * session is open, false otherwise.
*
* @return boolean
*/
@@ -58,17 +63,6 @@
void closeSession();
/**
- * Sends a master arbitration update to the device with a new election ID
- * that is expected to be the highest one between all clients.
- * <p>
- * If the server acknowledges this client as master, the {@link
- * P4RuntimeController} is expected to generate a {@link
- * org.onosproject.net.device.DeviceAgentEvent} with type {@link
- * org.onosproject.net.device.DeviceAgentEvent.Type#ROLE_MASTER}.
- */
- void runForMastership();
-
- /**
* Returns true if this client is master for the server, false otherwise.
*
* @return boolean
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
index 353d44e..d06e3fc 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
@@ -17,21 +17,25 @@
package org.onosproject.p4runtime.ctl.client;
import io.grpc.ManagedChannel;
+import io.grpc.Status;
import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.runtime.PiPacketOperation;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeClientKey;
-import org.onosproject.p4runtime.api.P4RuntimeEvent;
-import org.onosproject.p4runtime.ctl.controller.BaseEventSubject;
-import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
import p4.v1.P4RuntimeGrpc;
import p4.v1.P4RuntimeOuterClass;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -39,6 +43,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
+import static p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest.ResponseType.COOKIE_ONLY;
/**
* Implementation of P4RuntimeClient.
@@ -59,7 +64,6 @@
static final int LONG_TIMEOUT_SECONDS = 60;
private final long p4DeviceId;
- private final ManagedChannel channel;
private final P4RuntimeControllerImpl controller;
private final StreamClientImpl streamClient;
private final PipelineConfigClientImpl pipelineConfigClient;
@@ -67,25 +71,27 @@
/**
* Instantiates a new client with the given arguments.
*
- * @param clientKey client key
- * @param channel gRPC managed channel
- * @param controller P$Runtime controller instance
- * @param pipeconfService pipeconf service instance
+ * @param clientKey client key
+ * @param channel gRPC managed channel
+ * @param controller P$Runtime controller instance
+ * @param pipeconfService pipeconf service instance
+ * @param masterElectionIdStore master election ID store
*/
public P4RuntimeClientImpl(P4RuntimeClientKey clientKey,
ManagedChannel channel,
P4RuntimeControllerImpl controller,
- PiPipeconfService pipeconfService) {
- super(clientKey);
+ PiPipeconfService pipeconfService,
+ MasterElectionIdStore masterElectionIdStore) {
+ super(clientKey, channel, true, controller);
checkNotNull(channel);
checkNotNull(controller);
checkNotNull(pipeconfService);
+ checkNotNull(masterElectionIdStore);
this.p4DeviceId = clientKey.p4DeviceId();
- this.channel = channel;
this.controller = controller;
this.streamClient = new StreamClientImpl(
- pipeconfService, this, controller);
+ pipeconfService, masterElectionIdStore, this, controller);
this.pipelineConfigClient = new PipelineConfigClientImpl(this);
}
@@ -108,13 +114,13 @@
}
@Override
- public ReadRequest read(PiPipeconf pipeconf) {
- return new ReadRequestImpl(this, pipeconf);
+ public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
+ return pipelineConfigClient.isAnyPipelineConfigSet();
}
@Override
- public void openSession() {
- streamClient.openSession();
+ public ReadRequest read(PiPipeconf pipeconf) {
+ return new ReadRequestImpl(this, pipeconf);
}
@Override
@@ -128,8 +134,8 @@
}
@Override
- public void runForMastership() {
- streamClient.runForMastership();
+ public void setMastership(boolean master, BigInteger newElectionId) {
+ streamClient.setMastership(master, newElectionId);
}
@Override
@@ -147,6 +153,44 @@
return new WriteRequestImpl(this, pipeconf);
}
+ @Override
+ public CompletableFuture<Boolean> probeService() {
+ final CompletableFuture<Boolean> future = new CompletableFuture<>();
+ final StreamObserver<GetForwardingPipelineConfigResponse> responseObserver =
+ new StreamObserver<GetForwardingPipelineConfigResponse>() {
+ @Override
+ public void onNext(GetForwardingPipelineConfigResponse value) {
+ future.complete(true);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (Status.fromThrowable(t).getCode() ==
+ Status.Code.FAILED_PRECONDITION) {
+ // Pipeline not set but service is available.
+ future.complete(true);
+ } else {
+ log.debug("", t);
+ }
+ future.complete(false);
+ }
+
+ @Override
+ public void onCompleted() {
+ // Ignore, unary call.
+ }
+ };
+ // Use long timeout as the device might return the full P4 blob
+ // (e.g. server does not support cookie), over a slow network.
+ execRpc(s -> s.getForwardingPipelineConfig(
+ GetForwardingPipelineConfigRequest.newBuilder()
+ .setDeviceId(p4DeviceId)
+ .setResponseType(COOKIE_ONLY)
+ .build(), responseObserver),
+ SHORT_TIMEOUT_SECONDS);
+ return future;
+ }
+
/**
* Returns the P4Runtime-internal device ID associated with this client.
*
@@ -184,7 +228,8 @@
* @param stubConsumer P4Runtime stub consumer
* @param timeout timeout in seconds
*/
- void execRpc(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer, int timeout) {
+ void execRpc(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer,
+ int timeout) {
if (log.isTraceEnabled()) {
log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
timeout, context().getDeadline());
@@ -235,21 +280,10 @@
}
private void checkGrpcException(StatusRuntimeException sre) {
- switch (sre.getStatus().getCode()) {
- case PERMISSION_DENIED:
- // Notify upper layers that this node is not master.
- controller.postEvent(new P4RuntimeEvent(
- P4RuntimeEvent.Type.PERMISSION_DENIED,
- new BaseEventSubject(deviceId)));
- break;
- case UNAVAILABLE:
- // Channel might be closed.
- controller.postEvent(new P4RuntimeEvent(
- P4RuntimeEvent.Type.CHANNEL_EVENT,
- new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
- break;
- default:
- break;
+ if (sre.getStatus().getCode() == Status.Code.PERMISSION_DENIED) {
+ // Notify upper layers that this node is not master.
+ controller.postEvent(new DeviceAgentEvent(
+ DeviceAgentEvent.Type.NOT_MASTER, deviceId));
}
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
index c023f26..b0b6c57 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
@@ -18,6 +18,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
+import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.p4runtime.api.P4RuntimePipelineConfigClient;
@@ -32,11 +33,13 @@
import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigResponse;
import java.nio.ByteBuffer;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.LONG_TIMEOUT_SECONDS;
+import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.SHORT_TIMEOUT_SECONDS;
import static org.slf4j.LoggerFactory.getLogger;
import static p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest.ResponseType.COOKIE_ONLY;
import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
@@ -62,6 +65,12 @@
public CompletableFuture<Boolean> setPipelineConfig(
PiPipeconf pipeconf, ByteBuffer deviceData) {
+ if (!client.isSessionOpen()) {
+ log.warn("Dropping set pipeline config request for {}, session is CLOSED",
+ client.deviceId());
+ return completedFuture(false);
+ }
+
log.info("Setting pipeline config for {} to {}...",
client.deviceId(), pipeconf.id());
@@ -98,11 +107,13 @@
// All good, pipeline is set.
future.complete(true);
}
+
@Override
public void onError(Throwable t) {
client.handleRpcError(t, "SET-pipeline-config");
future.complete(false);
}
+
@Override
public void onCompleted() {
// Ignore, unary call.
@@ -154,6 +165,11 @@
pipeconf, expectedDeviceData, cfgFromDevice));
}
+ @Override
+ public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
+ return getPipelineCookieFromServer().thenApply(Objects::nonNull);
+ }
+
private boolean comparePipelineConfig(
PiPipeconf pipeconf, ByteBuffer expectedDeviceData,
ForwardingPipelineConfig cfgFromDevice) {
@@ -209,17 +225,38 @@
public void onNext(GetForwardingPipelineConfigResponse value) {
if (value.hasConfig()) {
future.complete(value.getConfig());
+ if (!value.getConfig().getP4DeviceConfig().isEmpty()) {
+ log.warn("{} returned GetForwardingPipelineConfigResponse " +
+ "with p4_device_config field set " +
+ "({} bytes), but we requested COOKIE_ONLY",
+ client.deviceId(),
+ value.getConfig().getP4DeviceConfig().size());
+ }
+ if (value.getConfig().hasP4Info()) {
+ log.warn("{} returned GetForwardingPipelineConfigResponse " +
+ "with p4_info field set " +
+ "({} bytes), but we requested COOKIE_ONLY",
+ client.deviceId(),
+ value.getConfig().getP4Info().getSerializedSize());
+ }
} else {
+ future.complete(null);
log.warn("{} returned {} with 'config' field unset",
client.deviceId(), value.getClass().getSimpleName());
}
- future.complete(null);
}
@Override
public void onError(Throwable t) {
- client.handleRpcError(t, "GET-pipeline-config");
future.complete(null);
+ if (Status.fromThrowable(t).getCode() ==
+ Status.Code.FAILED_PRECONDITION) {
+ // FAILED_PRECONDITION means that a pipeline
+ // config was not set in the first place, don't
+ // bother logging.
+ return;
+ }
+ client.handleRpcError(t, "GET-pipeline-config");
}
@Override
@@ -231,7 +268,7 @@
// (e.g. server does not support cookie), over a slow network.
client.execRpc(
s -> s.getForwardingPipelineConfig(request, responseObserver),
- LONG_TIMEOUT_SECONDS);
+ SHORT_TIMEOUT_SECONDS);
return future;
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
index 7afd97b..1bf195c 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -22,14 +22,15 @@
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.runtime.PiPacketOperation;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.api.P4RuntimeStreamClient;
import org.onosproject.p4runtime.ctl.codec.CodecException;
-import org.onosproject.p4runtime.ctl.controller.ArbitrationUpdateEvent;
-import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore.MasterElectionIdListener;
import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
import org.onosproject.p4runtime.ctl.controller.PacketInEvent;
import org.slf4j.Logger;
@@ -42,6 +43,8 @@
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
import static org.slf4j.LoggerFactory.getLogger;
@@ -54,73 +57,95 @@
private static final Logger log = getLogger(StreamClientImpl.class);
- private static final BigInteger ONE_THOUSAND = BigInteger.valueOf(1000);
-
private final P4RuntimeClientImpl client;
private final DeviceId deviceId;
private final long p4DeviceId;
private final PiPipeconfService pipeconfService;
+ private final MasterElectionIdStore masterElectionIdStore;
private final P4RuntimeControllerImpl controller;
+
private final StreamChannelManager streamChannelManager = new StreamChannelManager();
+ private final MasterElectionIdListener masterElectionIdListener = new InternalMasterElectionIdListener();
- private P4RuntimeOuterClass.Uint128 lastUsedElectionId = P4RuntimeOuterClass.Uint128
- .newBuilder().setLow(1).build();
+ private final AtomicBoolean isMaster = new AtomicBoolean(false);
+ private final AtomicBoolean requestedToBeMaster = new AtomicBoolean(false);
- private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
+ private BigInteger pendingElectionId = null;
+ private BigInteger lastUsedElectionId = null;
StreamClientImpl(
PiPipeconfService pipeconfService,
+ MasterElectionIdStore masterElectionIdStore,
P4RuntimeClientImpl client,
P4RuntimeControllerImpl controller) {
this.client = client;
this.deviceId = client.deviceId();
this.p4DeviceId = client.p4DeviceId();
this.pipeconfService = pipeconfService;
+ this.masterElectionIdStore = masterElectionIdStore;
this.controller = controller;
}
@Override
- public void openSession() {
- if (isSessionOpen()) {
- log.debug("Dropping request to open session for {}, session is already open",
- deviceId);
- return;
- }
- log.debug("Opening session for {}...", deviceId);
- sendMasterArbitrationUpdate(controller.newMasterElectionId(deviceId));
-
- }
-
- @Override
public boolean isSessionOpen() {
return streamChannelManager.isOpen();
}
@Override
public void closeSession() {
- streamChannelManager.complete();
+ synchronized (requestedToBeMaster) {
+ this.masterElectionIdStore.unsetListener(deviceId);
+ streamChannelManager.teardown();
+ pendingElectionId = null;
+ requestedToBeMaster.set(false);
+ isMaster.set(false);
+ }
}
@Override
- public void runForMastership() {
- if (!isSessionOpen()) {
- log.debug("Dropping mastership request for {}, session is closed",
- deviceId);
- return;
+ public void setMastership(boolean master, BigInteger newElectionId) {
+ checkNotNull(newElectionId);
+ checkArgument(newElectionId.compareTo(BigInteger.ZERO) > 0,
+ "newElectionId must be a non zero positive number");
+ synchronized (requestedToBeMaster) {
+ requestedToBeMaster.set(master);
+ pendingElectionId = newElectionId;
+ handlePendingElectionId(masterElectionIdStore.get(deviceId));
}
- // Becoming master is a race. Here we increase our chances of win, i.e.
- // using the highest election ID, against other ONOS nodes in the
- // cluster that are calling openSession() (which is used to start the
- // stream RPC session, not to become master).
- log.debug("Running for mastership on {}...", deviceId);
- final BigInteger masterId = controller.newMasterElectionId(deviceId)
- .add(ONE_THOUSAND);
- sendMasterArbitrationUpdate(masterId);
+ }
+
+ private void handlePendingElectionId(BigInteger masterElectionId) {
+ synchronized (requestedToBeMaster) {
+ if (pendingElectionId == null) {
+ // No pending requests.
+ return;
+ }
+ if (!requestedToBeMaster.get() && masterElectionId != null
+ && pendingElectionId.compareTo(masterElectionId) > 0) {
+ log.info("Deferring sending master arbitration update, master " +
+ "election ID of server ({}) is smaller than " +
+ "requested one ({}), but we do NOT want to be master...",
+ masterElectionId, pendingElectionId);
+ // Will try again as soon as the server reports a new master
+ // election ID that is bigger than the pending non-master one.
+ masterElectionIdStore.setListener(deviceId, masterElectionIdListener);
+ } else {
+ // Send now.
+ log.info("Setting mastership on {}... " +
+ "master={}, newElectionId={}, masterElectionId={}",
+ deviceId, requestedToBeMaster.get(),
+ pendingElectionId, masterElectionId);
+ sendMasterArbitrationUpdate(pendingElectionId);
+ pendingElectionId = null;
+ // No need to listen for master election ID changes.
+ masterElectionIdStore.unsetListener(deviceId);
+ }
+ }
}
@Override
public boolean isMaster() {
- return streamChannelManager.isOpen() && isClientMaster.get();
+ return isMaster.get();
}
@Override
@@ -141,7 +166,7 @@
final StreamMessageRequest packetOutRequest = StreamMessageRequest
.newBuilder().setPacket(packetOut).build();
// Send.
- streamChannelManager.sendIfOpen(packetOutRequest);
+ streamChannelManager.send(packetOutRequest);
} catch (CodecException e) {
log.error("Unable to send packet-out: {}", e.getMessage());
}
@@ -160,7 +185,7 @@
.setElectionId(idMsg)
.build())
.build());
- lastUsedElectionId = idMsg;
+ lastUsedElectionId = electionId;
}
private P4RuntimeOuterClass.Uint128 bigIntegerToUint128(BigInteger value) {
@@ -220,13 +245,22 @@
if (!msg.hasElectionId() || !msg.hasStatus()) {
return;
}
- final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
- log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
- deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
- controller.postEvent(new P4RuntimeEvent(
- P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
- new ArbitrationUpdateEvent(deviceId, isMaster)));
- isClientMaster.set(isMaster);
+ // Is this client master?
+ isMaster.set(msg.getStatus().getCode() == Status.OK.getCode().value());
+ // Notify new master election IDs to all nodes via distributed store.
+ // This is required for those nodes who do not have a Stream RPC open,
+ // and that otherwise would not be aware of changes, keeping their
+ // pending mastership operations forever.
+ final BigInteger masterElectionId = uint128ToBigInteger(msg.getElectionId());
+ masterElectionIdStore.set(deviceId, masterElectionId);
+
+ log.debug("Received arbitration update from {}: isMaster={}, masterElectionId={}",
+ deviceId, isMaster.get(), masterElectionId);
+
+ // Post mastership event via controller.
+ controller.postEvent(new DeviceAgentEvent(
+ isMaster.get() ? DeviceAgentEvent.Type.ROLE_MASTER
+ : DeviceAgentEvent.Type.ROLE_STANDBY, deviceId));
}
/**
@@ -236,7 +270,22 @@
* @return election ID uint128 protobuf message
*/
P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
- return lastUsedElectionId;
+ return lastUsedElectionId == null
+ ? P4RuntimeOuterClass.Uint128.getDefaultInstance()
+ : bigIntegerToUint128(lastUsedElectionId);
+ }
+
+ /**
+ * Handles updates of the master election ID by applying any pending
+ * mastership operation.
+ */
+ private class InternalMasterElectionIdListener
+ implements MasterElectionIdStore.MasterElectionIdListener {
+
+ @Override
+ public void updated(BigInteger masterElectionId) {
+ handlePendingElectionId(masterElectionId);
+ }
}
/**
@@ -258,23 +307,12 @@
}
}
- void sendIfOpen(StreamMessageRequest value) {
- // We do not lock here, but we ignore NPEs due to stream RPC not
- // being active (null requestObserver). Good for frequent
- // packet-outs.
- try {
- doSend(value);
- } catch (NullPointerException e) {
- if (requestObserver != null) {
- // Must be something else.
- throw e;
- }
- }
- }
-
private void doSend(StreamMessageRequest value) {
try {
requestObserver.onNext(value);
+ // Optimistically set the session as open. In case of errors, it
+ // will be closed by the response stream observer.
+ streamChannelManager.signalOpen();
} catch (Throwable ex) {
if (ex instanceof StatusRuntimeException) {
log.warn("Unable to send {} to {}: {}",
@@ -283,7 +321,7 @@
log.error("Exception while sending {} to {}: {}",
value.getUpdateCase().toString(), deviceId, ex);
}
- complete();
+ teardown();
}
}
@@ -299,7 +337,7 @@
}
}
- void complete() {
+ void teardown() {
synchronized (this) {
signalClosed();
if (requestObserver != null) {
@@ -311,23 +349,16 @@
}
void signalOpen() {
- synchronized (this) {
- final boolean wasOpen = open.getAndSet(true);
- if (!wasOpen) {
- controller.postEvent(new P4RuntimeEvent(
- P4RuntimeEvent.Type.CHANNEL_EVENT,
- new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
- }
- }
+ open.set(true);
}
void signalClosed() {
synchronized (this) {
final boolean wasOpen = open.getAndSet(false);
if (wasOpen) {
- controller.postEvent(new P4RuntimeEvent(
- P4RuntimeEvent.Type.CHANNEL_EVENT,
- new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
+ // We lost any valid mastership role.
+ controller.postEvent(new DeviceAgentEvent(
+ DeviceAgentEvent.Type.ROLE_NONE, deviceId));
}
}
}
@@ -352,13 +383,11 @@
@Override
public void onNext(StreamMessageResponse message) {
- streamChannelManager.signalOpen();
try {
if (log.isTraceEnabled()) {
- log.trace(
- "Received {} from {}: {}",
- message.getUpdateCase(), deviceId,
- TextFormat.shortDebugString(message));
+ log.trace("Received {} from {}: {}",
+ message.getUpdateCase(), deviceId,
+ TextFormat.shortDebugString(message));
}
switch (message.getUpdateCase()) {
case PACKET:
@@ -388,17 +417,18 @@
log.warn("Error on stream channel for {}: {}",
deviceId, throwable.getMessage());
}
+ log.debug("", throwable);
} else {
log.error(format("Exception on stream channel for %s",
deviceId), throwable);
}
- streamChannelManager.complete();
+ streamChannelManager.teardown();
}
@Override
public void onCompleted() {
log.warn("Stream channel for {} has completed", deviceId);
- streamChannelManager.complete();
+ streamChannelManager.teardown();
}
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java
deleted file mode 100644
index 0a77e46..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl.controller;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-
-/**
- * Channel event in P4Runtime.
- */
-public final class ChannelEvent implements P4RuntimeEventSubject {
-
- public enum Type {
- OPEN,
- CLOSED,
- ERROR
- }
-
- private DeviceId deviceId;
- private Type type;
-
- /**
- * Creates channel event with given status and throwable.
- *
- * @param deviceId the device
- * @param type error type
- */
- public ChannelEvent(DeviceId deviceId, Type type) {
- this.deviceId = deviceId;
- this.type = type;
- }
-
- /**
- * Gets the type of this event.
- *
- * @return the error type
- */
- public Type type() {
- return type;
- }
-
- @Override
- public DeviceId deviceId() {
- return deviceId;
- }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java
deleted file mode 100644
index 980ab11..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl.controller;
-
-import org.onlab.util.KryoNamespace;
-import org.onosproject.net.DeviceId;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.AtomicCounterMap;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-
-import java.math.BigInteger;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Distributed implementation of a generator of P4Runtime election IDs.
- */
-final class DistributedElectionIdGenerator {
-
- private final Logger log = getLogger(this.getClass());
-
- // FIXME: counter map use long, but P4Runtime accepts 128bit election IDs
- private AtomicCounterMap<DeviceId> electionIds;
-
- /**
- * Creates a new election ID generator using the given storage service.
- *
- * @param storageService storage service
- */
- DistributedElectionIdGenerator(StorageService storageService) {
- KryoNamespace serializer = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .build();
- this.electionIds = storageService.<DeviceId>atomicCounterMapBuilder()
- .withName("p4runtime-election-ids")
- .withSerializer(Serializer.using(serializer))
- .build();
- }
-
- /**
- * Returns an election ID for the given device ID. The first election ID for
- * a given device ID is always 1.
- *
- * @param deviceId device ID
- * @return new election ID
- */
- BigInteger generate(DeviceId deviceId) {
- if (electionIds == null) {
- return null;
- }
- // Default value is 0 for AtomicCounterMap.
- return BigInteger.valueOf(electionIds.incrementAndGet(deviceId));
- }
-
- /**
- * Destroy the backing distributed primitive of this generator.
- */
- void destroy() {
- try {
- electionIds.destroy().get(10, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.error("Exception while destroying distributed counter map", e);
- } finally {
- electionIds = null;
- }
- }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
new file mode 100644
index 0000000..dff2330
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.concurrent.ConcurrentMap;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Distributed implementation of MasterElectionIdStore.
+ */
+@Component(immediate = true, service = MasterElectionIdStore.class)
+public class DistributedMasterElectionIdStore implements MasterElectionIdStore {
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ private static final KryoNamespace SERIALIZER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(BigInteger.class)
+ .build();
+
+ private final Logger log = getLogger(getClass());
+ private final EventuallyConsistentMapListener<DeviceId, BigInteger> mapListener =
+ new InternalMapListener();
+
+ private EventuallyConsistentMap<DeviceId, BigInteger> masterElectionIds;
+ private ConcurrentMap<DeviceId, MasterElectionIdListener> listeners =
+ Maps.newConcurrentMap();
+
+ @Activate
+ public void activate() {
+ this.listeners = Maps.newConcurrentMap();
+ this.masterElectionIds = storageService.<DeviceId, BigInteger>eventuallyConsistentMapBuilder()
+ .withName("p4runtime-master-election-ids")
+ .withSerializer(SERIALIZER)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .build();
+ this.masterElectionIds.addListener(mapListener);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ this.masterElectionIds.removeListener(mapListener);
+ this.masterElectionIds.destroy();
+ this.masterElectionIds = null;
+ this.listeners.clear();
+ this.listeners = null;
+ log.info("Stopped");
+ }
+
+
+ @Override
+ public void set(DeviceId deviceId, BigInteger electionId) {
+ checkNotNull(deviceId);
+ checkNotNull(electionId);
+ this.masterElectionIds.put(deviceId, electionId);
+ }
+
+ @Override
+ public BigInteger get(DeviceId deviceId) {
+ checkNotNull(deviceId);
+ return this.masterElectionIds.get(deviceId);
+ }
+
+ @Override
+ public void remove(DeviceId deviceId) {
+ checkNotNull(deviceId);
+ this.masterElectionIds.remove(deviceId);
+ }
+
+ @Override
+ public void setListener(DeviceId deviceId, MasterElectionIdListener newListener) {
+ checkNotNull(deviceId);
+ checkNotNull(newListener);
+ listeners.compute(deviceId, (did, existingListener) -> {
+ if (existingListener == null || existingListener == newListener) {
+ return newListener;
+ } else {
+ log.error("Cannot add listener as one already exist for {}", deviceId);
+ return existingListener;
+ }
+ });
+ }
+
+ @Override
+ public void unsetListener(DeviceId deviceId) {
+ listeners.remove(deviceId);
+ }
+
+ private class InternalMapListener implements EventuallyConsistentMapListener<DeviceId, BigInteger> {
+ @Override
+ public void event(EventuallyConsistentMapEvent<DeviceId, BigInteger> event) {
+ final MasterElectionIdListener listener = listeners.get(event.key());
+ if (listener == null) {
+ return;
+ }
+ switch (event.type()) {
+ case PUT:
+ listener.updated(event.value());
+ break;
+ case REMOVE:
+ listener.updated(null);
+ break;
+ default:
+ log.error("Unrecognized map event type {}", event.type());
+ }
+ }
+ }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
new file mode 100644
index 0000000..f5393d1
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import org.onosproject.net.DeviceId;
+
+import java.math.BigInteger;
+
+/**
+ * Store that keeps track of master election IDs for each device.
+ */
+public interface MasterElectionIdStore {
+
+ /**
+ * Sets the master election ID for the given device.
+ *
+ * @param deviceId device ID
+ * @param electionId election ID
+ */
+ void set(DeviceId deviceId, BigInteger electionId);
+
+ /**
+ * Returns the last known master election ID for the given device, or null.
+ *
+ * @param deviceId device ID
+ * @return election ID
+ */
+ BigInteger get(DeviceId deviceId);
+
+ /**
+ * Removes any state associated with the given device.
+ *
+ * @param deviceId device ID
+ */
+ void remove(DeviceId deviceId);
+
+ /**
+ * Sets a listener for the given device that will be invoked every time
+ * there will be changes to the master election ID.
+ *
+ * @param deviceId device ID
+ * @param listener listener
+ */
+ void setListener(DeviceId deviceId, MasterElectionIdListener listener);
+
+ /**
+ * Unset the listener for the given device.
+ *
+ * @param deviceId device ID
+ */
+ void unsetListener(DeviceId deviceId);
+
+ /**
+ * Listener of master election ID changes for a specific device.
+ */
+ interface MasterElectionIdListener {
+
+ /**
+ * Notifies that the master election ID has been updated to the given
+ * (nullable) value.
+ *
+ * @param masterElectionId new master election ID, or null
+ */
+ void updated(BigInteger masterElectionId);
+ }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
index affbf7d..2256d5e 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
@@ -16,33 +16,19 @@
package org.onosproject.p4runtime.ctl.controller;
-import com.google.common.collect.Maps;
import io.grpc.ManagedChannel;
import org.onosproject.grpc.ctl.AbstractGrpcClientController;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceAgentEvent;
-import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.pi.service.PiPipeconfService;
-import org.onosproject.net.provider.ProviderId;
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.api.P4RuntimeEventListener;
import org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl;
-import org.onosproject.store.service.StorageService;
-import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-
-import java.math.BigInteger;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
/**
* P4Runtime controller implementation.
@@ -53,122 +39,34 @@
<P4RuntimeClientKey, P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
implements P4RuntimeController {
- private final Logger log = getLogger(getClass());
-
- private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
- deviceAgentListeners = Maps.newConcurrentMap();
-
- private DistributedElectionIdGenerator electionIdGenerator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- private StorageService storageService;
-
@Reference(cardinality = ReferenceCardinality.MANDATORY)
private PiPipeconfService pipeconfService;
- @Activate
- public void activate() {
- super.activate();
- eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
- electionIdGenerator = new DistributedElectionIdGenerator(storageService);
- log.info("Started");
- }
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private MasterElectionIdStore masterElectionIdStore;
- @Deactivate
- public void deactivate() {
- super.deactivate();
- deviceAgentListeners.clear();
- electionIdGenerator.destroy();
- electionIdGenerator = null;
- log.info("Stopped");
+ public P4RuntimeControllerImpl() {
+ super(P4RuntimeEvent.class);
}
@Override
- protected P4RuntimeClient createClientInstance(P4RuntimeClientKey clientKey, ManagedChannel channel) {
- return new P4RuntimeClientImpl(clientKey, channel, this, pipeconfService);
+ public void removeClient(DeviceId deviceId) {
+ super.removeClient(deviceId);
+ // Assuming that when a client is removed, it is done so by all nodes,
+ // this is the best place to clear master election ID state.
+ masterElectionIdStore.remove(deviceId);
}
@Override
- public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
- checkNotNull(deviceId, "deviceId cannot be null");
- checkNotNull(deviceId, "providerId cannot be null");
- checkNotNull(listener, "listener cannot be null");
- deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
- deviceAgentListeners.get(deviceId).put(providerId, listener);
+ public void removeClient(P4RuntimeClientKey clientKey) {
+ super.removeClient(clientKey);
+ masterElectionIdStore.remove(clientKey.deviceId());
}
@Override
- public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
- checkNotNull(deviceId, "deviceId cannot be null");
- checkNotNull(providerId, "listener cannot be null");
- deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
- listeners.remove(providerId);
- return listeners;
- });
- }
-
- public BigInteger newMasterElectionId(DeviceId deviceId) {
- return electionIdGenerator.generate(deviceId);
- }
-
- public void postEvent(P4RuntimeEvent event) {
- switch (event.type()) {
- case CHANNEL_EVENT:
- handleChannelEvent(event);
- break;
- case ARBITRATION_RESPONSE:
- handleArbitrationReply(event);
- break;
- case PERMISSION_DENIED:
- handlePermissionDenied(event);
- break;
- default:
- post(event);
- break;
- }
- }
-
- private void handlePermissionDenied(P4RuntimeEvent event) {
- postDeviceAgentEvent(event.subject().deviceId(), new DeviceAgentEvent(
- DeviceAgentEvent.Type.NOT_MASTER, event.subject().deviceId()));
- }
-
- private void handleChannelEvent(P4RuntimeEvent event) {
- final ChannelEvent channelEvent = (ChannelEvent) event.subject();
- final DeviceId deviceId = channelEvent.deviceId();
- final DeviceAgentEvent.Type agentEventType;
- switch (channelEvent.type()) {
- case OPEN:
- agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
- break;
- case CLOSED:
- agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
- break;
- case ERROR:
- agentEventType = !isReachable(deviceId)
- ? DeviceAgentEvent.Type.CHANNEL_CLOSED
- : DeviceAgentEvent.Type.CHANNEL_ERROR;
- break;
- default:
- log.warn("Unrecognized channel event type {}", channelEvent.type());
- return;
- }
- postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
- }
-
- private void handleArbitrationReply(P4RuntimeEvent event) {
- final DeviceId deviceId = event.subject().deviceId();
- final ArbitrationUpdateEvent response = (ArbitrationUpdateEvent) event.subject();
- final DeviceAgentEvent.Type roleType = response.isMaster()
- ? DeviceAgentEvent.Type.ROLE_MASTER
- : DeviceAgentEvent.Type.ROLE_STANDBY;
- postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
- roleType, response.deviceId()));
- }
-
- private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
- if (deviceAgentListeners.containsKey(deviceId)) {
- deviceAgentListeners.get(deviceId).values().forEach(l -> l.event(event));
- }
+ protected P4RuntimeClient createClientInstance(
+ P4RuntimeClientKey clientKey, ManagedChannel channel) {
+ return new P4RuntimeClientImpl(clientKey, channel, this,
+ pipeconfService, masterElectionIdStore);
}
}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
new file mode 100644
index 0000000..595e07d
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
+
+import java.math.BigInteger;
+
+public class MockMasterElectionIdStore implements MasterElectionIdStore {
+
+ @Override
+ public void set(DeviceId deviceId, BigInteger electionId) {
+
+ }
+
+ @Override
+ public BigInteger get(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
+ public void remove(DeviceId deviceId) {
+
+ }
+
+ @Override
+ public void setListener(DeviceId deviceId, MasterElectionIdListener listener) {
+
+ }
+
+ @Override
+ public void unsetListener(DeviceId deviceId) {
+
+ }
+}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index c6a43cb..4c4dcdf 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -55,6 +55,8 @@
import p4.v1.P4RuntimeOuterClass.WriteRequest;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collection;
import java.util.List;
@@ -62,6 +64,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static java.lang.String.format;
import static org.easymock.EasyMock.niceMock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -105,7 +108,7 @@
private static final int SET_EGRESS_PORT_ID = 16794308;
private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
private static final long DEFAULT_TIMEOUT_TIME = 10;
- private static final Uint128 DEFAULT_ELECTION_ID = Uint128.newBuilder().setLow(1).build();
+ private static final Uint128 DEFAULT_ELECTION_ID = Uint128.getDefaultInstance();
private static final String P4R_IP = "127.0.0.1";
private static final int P4R_PORT = 50010;
@@ -157,10 +160,13 @@
@Before
- public void setup() {
+ public void setup() throws URISyntaxException {
controller = niceMock(org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl.class);
- P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, P4R_IP, P4R_PORT, P4_DEVICE_ID);
- client = new P4RuntimeClientImpl(clientKey, grpcChannel, controller, new MockPipeconfService());
+ P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, new URI(
+ format("grpc://%s:%d?device_id=%d", P4R_IP, P4R_PORT, P4_DEVICE_ID)));
+ client = new P4RuntimeClientImpl(
+ clientKey, grpcChannel, controller, new MockPipeconfService(),
+ new MockMasterElectionIdStore());
}
@Test