Allow sharing the same gRPC channel between clients
This change introduces a refactoring of the gRPC protocol subsystem that
allows the creation of a gRPC chanel independently of the client, while
allowing multiple clients to share the same channel (e.g. as in Stratum
where we use 3 clients).
Moreover, we refactor the P4RuntimeClient API to support multiple
P4Runtime-internal device ID using the same client. While before the
client was associated to one of such ID.
Finally, we provide an abstract implementation for gRPC-based driver
behaviors, reducing code duplication in P4Runtime, gNMI and gNOI drivers.
Change-Id: I1a46352bbbef1e0d24042f169ae8ba580202944f
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
deleted file mode 100644
index 452e978..0000000
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.api;
-
-import com.google.common.annotations.Beta;
-import org.onosproject.grpc.api.GrpcClientKey;
-import org.onosproject.net.DeviceId;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLDecoder;
-
-import static java.lang.String.format;
-
-/**
- * Key that uniquely identifies a P4Runtime client.
- */
-@Beta
-public final class P4RuntimeClientKey extends GrpcClientKey {
-
- private static final String DEVICE_ID_PARAM = "device_id=";
-
- private static final String P4RUNTIME = "P4Runtime";
- private final long p4DeviceId;
-
- /**
- * Creates a new client key. The server URI is expected to carry the
- * P4runtime server-internal 'device_id' as a param in the query string. For
- * example, grpc://10.0.0.1:5001?device_id=1
- *
- * @param deviceId ONOS device ID
- * @param serverUri P4Runtime server URI
- */
- public P4RuntimeClientKey(DeviceId deviceId, URI serverUri) {
- super(P4RUNTIME, deviceId, serverUri);
- this.p4DeviceId = extractP4DeviceId(serverUri);
- }
-
- private static Long extractP4DeviceId(URI uri) {
- String[] segments = uri.getRawQuery().split("&");
- try {
- for (String s : segments) {
- if (s.startsWith(DEVICE_ID_PARAM)) {
- return Long.parseUnsignedLong(
- URLDecoder.decode(
- s.substring(DEVICE_ID_PARAM.length()), "utf-8"));
- }
- }
- } catch (UnsupportedEncodingException e) {
- throw new IllegalArgumentException(format(
- "Unable to decode P4Runtime-interval device_id from URI %s: %s",
- uri, e.toString()));
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException(format(
- "Invalid P4Runtime-interval device_id in URI %s: %s",
- uri, e.toString()));
- }
- throw new IllegalArgumentException(format(
- "Missing P4Runtime-interval device_id in URI %s",
- uri));
- }
-
- /**
- * Returns the P4Runtime server-internal device ID.
- *
- * @return P4Runtime server-internal device ID
- */
- public long p4DeviceId() {
- return p4DeviceId;
- }
-}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 18e925a..fec4389 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -25,7 +25,7 @@
*/
@Beta
public interface P4RuntimeController
- extends GrpcClientController<P4RuntimeClientKey, P4RuntimeClient>,
+ extends GrpcClientController<P4RuntimeClient>,
ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
index 81928c1..2d9cb94 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
@@ -36,27 +36,30 @@
* blob") to be used in the P4Runtime's {@code SetPipelineConfig} message.
* Returns true if the operations was successful, false otherwise.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @param pipeconf pipeconf
* @param deviceData target-specific data
* @return completable future, true if the operations was successful, false
* otherwise.
*/
CompletableFuture<Boolean> setPipelineConfig(
- PiPipeconf pipeconf, ByteBuffer deviceData);
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData);
/**
- * Same as {@link #setPipelineConfig(PiPipeconf, ByteBuffer)}, but blocks
- * execution.
+ * Same as {@link #setPipelineConfig(long, PiPipeconf, ByteBuffer)}, but
+ * blocks execution.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @param pipeconf pipeconf
* @param deviceData target-specific data
* @return true if the operations was successful, false otherwise.
*/
default boolean setPipelineConfigSync(
- PiPipeconf pipeconf, ByteBuffer deviceData) {
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
checkNotNull(pipeconf);
checkNotNull(deviceData);
- return Futures.getUnchecked(setPipelineConfig(pipeconf, deviceData));
+ return Futures.getUnchecked(setPipelineConfig(
+ p4DeviceId, pipeconf, deviceData));
}
/**
@@ -66,48 +69,53 @@
* P4Info and target-specific data for comparison.
* <p>
* This method is expected to return {@code true} if invoked after calling
- * {@link #setPipelineConfig(PiPipeconf, ByteBuffer)} with the same
+ * {@link #setPipelineConfig(long, PiPipeconf, ByteBuffer)} with the same
* parameters.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @param pipeconf pipeconf
* @param deviceData target-specific data
* @return completable future, true if the device has the given pipeconf
* set, false otherwise.
*/
CompletableFuture<Boolean> isPipelineConfigSet(
- PiPipeconf pipeconf, ByteBuffer deviceData);
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData);
/**
- * Same as {@link #isPipelineConfigSet(PiPipeconf, ByteBuffer)} but blocks
- * execution.
+ * Same as {@link #isPipelineConfigSet(long, PiPipeconf, ByteBuffer)} but
+ * blocks execution.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @param pipeconf pipeconf
* @param deviceData target-specific data
* @return true if the device has the given pipeconf set, false otherwise.
*/
default boolean isPipelineConfigSetSync(
- PiPipeconf pipeconf, ByteBuffer deviceData) {
- return Futures.getUnchecked(isPipelineConfigSet(pipeconf, deviceData));
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
+ return Futures.getUnchecked(isPipelineConfigSet(
+ p4DeviceId, pipeconf, deviceData));
}
/**
* Returns true if the device has a pipeline config set, false otherwise.
* <p>
* This method is expected to return {@code true} if invoked after
- * successfully calling {@link #setPipelineConfig(PiPipeconf, ByteBuffer)}
- * with any parameter.
+ * successfully calling {@link #setPipelineConfig(long, PiPipeconf,
+ * ByteBuffer)} with any parameter.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @return completable future, true if the device has a pipeline config set,
* false otherwise.
*/
- CompletableFuture<Boolean> isAnyPipelineConfigSet();
+ CompletableFuture<Boolean> isAnyPipelineConfigSet(long p4DeviceId);
/**
- * Same as {@link #isAnyPipelineConfigSet()}, but blocks execution.
+ * Same as {@link #isAnyPipelineConfigSet(long)}, but blocks execution.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @return true if the device has a pipeline config set, false otherwise.
*/
- default boolean isAnyPipelineConfigSetSync() {
- return Futures.getUnchecked(isAnyPipelineConfigSet());
+ default boolean isAnyPipelineConfigSetSync(long p4DeviceId) {
+ return Futures.getUnchecked(isAnyPipelineConfigSet(p4DeviceId));
}
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
index d4c3f61..b99aef3 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
@@ -35,12 +35,14 @@
/**
* Returns a new {@link ReadRequest} instance that can bed used to build a
- * batched read request, for the given pipeconf.
+ * batched read request, for the given P4Runtime-internal device ID and
+ * pipeconf.
*
- * @param pipeconf pipeconf
+ * @param p4DeviceId P4Runtime-internal device ID
+ * @param pipeconf pipeconf
* @return new read request
*/
- ReadRequest read(PiPipeconf pipeconf);
+ ReadRequest read(long p4DeviceId, PiPipeconf pipeconf);
/**
* Abstraction of a batched P4Runtime read request. Multiple entities can be
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
index 555f906..3ef6843 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
@@ -32,48 +32,58 @@
public interface P4RuntimeStreamClient {
/**
- * Opportunistically opens a session with the server by starting a
- * StreamChannel RPC and sends a {@code MasterArbitrationUpdate} message
- * with the given election ID. The {@code master} boolean flag is used to
- * indicated if we are trying to became master or not. If false, the
- * implementation might delay sending the {@code MasterArbitrationUpdate}
- * message until another node becomes master with a higher election ID.
+ * Opportunistically opens a session with the server for the given
+ * P4Runtime-internal device ID by starting a StreamChannel RPC and sending
+ * a {@code MasterArbitrationUpdate} message with the given election ID. The
+ * {@code master} boolean flag is used to indicated if we are trying to
+ * became master or not. If false, the implementation might delay sending
+ * the {@code MasterArbitrationUpdate} message until another node becomes
+ * master with a higher election ID.
* <p>
* If the server acknowledges this client as master, the {@link
* P4RuntimeController} is expected to generate a {@link
* org.onosproject.net.device.DeviceAgentEvent} with type {@link
* org.onosproject.net.device.DeviceAgentEvent.Type#ROLE_MASTER}.
*
- * @param master true if we are trying to become master
+ * @param p4DeviceId P4Runtime-internal device ID
+ * @param master true if we are trying to become master
* @param electionId election ID
*/
- void setMastership(boolean master, BigInteger electionId);
+ void setMastership(long p4DeviceId, boolean master, BigInteger electionId);
/**
* Returns true if the StreamChannel RPC is active and hence the P4Runtime
- * session is open, false otherwise.
+ * session for the given P4Runtime-internal device ID is open, false
+ * otherwise.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @return boolean
*/
- boolean isSessionOpen();
+ boolean isSessionOpen(long p4DeviceId);
/**
- * Closes the session to the server by terminating the Stream RPC.
- */
- void closeSession();
-
- /**
- * Returns true if this client is master for the server, false otherwise.
+ * Closes the session to the server by terminating the StreamChannel RPC for
+ * the given P4Runtime-internal device ID.
*
+ * @param p4DeviceId P4Runtime-internal device ID
+ */
+ void closeSession(long p4DeviceId);
+
+ /**
+ * Returns true if this client is master for the given P4Runtime-internal
+ * device ID, false otherwise.
+ *
+ * @param p4DeviceId P4Runtime-internal device ID
* @return boolean
*/
- boolean isMaster();
+ boolean isMaster(long p4DeviceId);
/**
- * Sends a packet-out for the given pipeconf.
+ * Sends a packet-out for the given P4Runtime-internal device ID.
*
- * @param packet packet-out operation to be performed by the device
- * @param pipeconf pipeconf currently deployed on the device
+ * @param p4DeviceId P4Runtime-internal device ID
+ * @param packet packet-out operation to be performed by the device
+ * @param pipeconf pipeconf currently deployed on the device
*/
- void packetOut(PiPacketOperation packet, PiPipeconf pipeconf);
+ void packetOut(long p4DeviceId, PiPacketOperation packet, PiPipeconf pipeconf);
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
index a6f3408..4185f70 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
@@ -33,12 +33,14 @@
/**
* Returns a new {@link WriteRequest} instance that can be used to build a
- * batched write request, for the given pipeconf.
+ * batched write request, for the given P4Runtime-internal device ID and
+ * pipeconf.
*
- * @param pipeconf pipeconf
+ * @param p4DeviceId P4Runtime-internal device ID
+ * @param pipeconf pipeconf
* @return new write request
*/
- WriteRequest write(PiPipeconf pipeconf);
+ WriteRequest write(long p4DeviceId, PiPipeconf pipeconf);
/**
* Signals the type of write operation for a given PI entity.
@@ -222,8 +224,6 @@
* responce from the device, in the same order they were added to this
* batch.
*
- *
- *
* @return entity update requests
*/
Collection<EntityUpdateRequest> pendingUpdates();
@@ -338,9 +338,9 @@
/**
* Returns the status for this PI entity. If {@link #isSuccess()}
* returns {@code true}, then this method is expected to return {@link
- * EntityUpdateStatus#OK}. If {@link EntityUpdateStatus#OTHER_ERROR}
- * is returned, further details might be provided in {@link
- * #explanation()} and {@link #throwable()}.
+ * EntityUpdateStatus#OK}. If {@link EntityUpdateStatus#OTHER_ERROR} is
+ * returned, further details might be provided in {@link #explanation()}
+ * and {@link #throwable()}.
*
* @return status
*/
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
index d54ef97..653fcfd 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
@@ -16,6 +16,8 @@
package org.onosproject.p4runtime.ctl.client;
+import com.google.common.collect.Maps;
+import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -27,7 +29,6 @@
import org.onosproject.net.pi.runtime.PiPacketOperation;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
-import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
import p4.v1.P4RuntimeGrpc;
@@ -38,6 +39,7 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -50,6 +52,8 @@
public final class P4RuntimeClientImpl
extends AbstractGrpcClient implements P4RuntimeClient {
+ private static final long DEFAULT_P4_DEVICE_ID = 1;
+
// TODO: consider making timeouts configurable per-device via netcfg
/**
* Timeout in seconds for short/fast RPCs.
@@ -62,94 +66,103 @@
*/
static final int LONG_TIMEOUT_SECONDS = 60;
- private final long p4DeviceId;
private final P4RuntimeControllerImpl controller;
- private final StreamClientImpl streamClient;
private final PipelineConfigClientImpl pipelineConfigClient;
+ private final PiPipeconfService pipeconfService;
+ private final MasterElectionIdStore masterElectionIdStore;
+ private final ConcurrentMap<Long, StreamClientImpl> streamClients = Maps.newConcurrentMap();
/**
* Instantiates a new client with the given arguments.
*
- * @param clientKey client key
+ * @param deviceId device ID
* @param channel gRPC managed channel
- * @param controller P$Runtime controller instance
+ * @param controller P4Runtime controller instance
* @param pipeconfService pipeconf service instance
* @param masterElectionIdStore master election ID store
*/
- public P4RuntimeClientImpl(P4RuntimeClientKey clientKey,
+ public P4RuntimeClientImpl(DeviceId deviceId,
ManagedChannel channel,
P4RuntimeControllerImpl controller,
PiPipeconfService pipeconfService,
MasterElectionIdStore masterElectionIdStore) {
- super(clientKey, channel, true, controller);
+ super(deviceId, channel, true, controller);
checkNotNull(channel);
checkNotNull(controller);
checkNotNull(pipeconfService);
checkNotNull(masterElectionIdStore);
- this.p4DeviceId = clientKey.p4DeviceId();
this.controller = controller;
- this.streamClient = new StreamClientImpl(
- pipeconfService, masterElectionIdStore, this, controller);
+ this.pipeconfService = pipeconfService;
+ this.masterElectionIdStore = masterElectionIdStore;
this.pipelineConfigClient = new PipelineConfigClientImpl(this);
}
@Override
public void shutdown() {
- streamClient.closeSession();
+ streamClients.forEach((p4DeviceId, streamClient) ->
+ streamClient.closeSession(p4DeviceId));
super.shutdown();
}
@Override
public CompletableFuture<Boolean> setPipelineConfig(
- PiPipeconf pipeconf, ByteBuffer deviceData) {
- return pipelineConfigClient.setPipelineConfig(pipeconf, deviceData);
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
+ return pipelineConfigClient.setPipelineConfig(p4DeviceId, pipeconf, deviceData);
}
@Override
public CompletableFuture<Boolean> isPipelineConfigSet(
- PiPipeconf pipeconf, ByteBuffer deviceData) {
- return pipelineConfigClient.isPipelineConfigSet(pipeconf, deviceData);
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
+ return pipelineConfigClient.isPipelineConfigSet(p4DeviceId, pipeconf, deviceData);
}
@Override
- public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
- return pipelineConfigClient.isAnyPipelineConfigSet();
+ public CompletableFuture<Boolean> isAnyPipelineConfigSet(long p4DeviceId) {
+ return pipelineConfigClient.isAnyPipelineConfigSet(p4DeviceId);
}
@Override
- public ReadRequest read(PiPipeconf pipeconf) {
- return new ReadRequestImpl(this, pipeconf);
+ public ReadRequest read(long p4DeviceId, PiPipeconf pipeconf) {
+ return new ReadRequestImpl(this, p4DeviceId, pipeconf);
}
@Override
- public boolean isSessionOpen() {
- return streamClient.isSessionOpen();
+ public boolean isSessionOpen(long p4DeviceId) {
+ return streamClients.containsKey(p4DeviceId) &&
+ streamClients.get(p4DeviceId).isSessionOpen(p4DeviceId);
}
@Override
- public void closeSession() {
- streamClient.closeSession();
+ public void closeSession(long p4DeviceId) {
+ if (streamClients.containsKey(p4DeviceId)) {
+ streamClients.get(p4DeviceId).closeSession(p4DeviceId);
+ }
}
@Override
- public void setMastership(boolean master, BigInteger newElectionId) {
- streamClient.setMastership(master, newElectionId);
+ public void setMastership(long p4DeviceId, boolean master, BigInteger newElectionId) {
+ streamClients.putIfAbsent(p4DeviceId, new StreamClientImpl(
+ pipeconfService, masterElectionIdStore, this, p4DeviceId, controller));
+ streamClients.get(p4DeviceId).setMastership(p4DeviceId, master, newElectionId);
}
@Override
- public boolean isMaster() {
- return streamClient.isMaster();
+ public boolean isMaster(long p4DeviceId) {
+ return streamClients.containsKey(p4DeviceId) &&
+ streamClients.get(p4DeviceId).isMaster(p4DeviceId);
}
@Override
- public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
- streamClient.packetOut(packet, pipeconf);
+ public void packetOut(long p4DeviceId, PiPacketOperation packet, PiPipeconf pipeconf) {
+ if (streamClients.containsKey(p4DeviceId)) {
+ streamClients.get(p4DeviceId).packetOut(p4DeviceId, packet, pipeconf);
+ }
}
@Override
- public WriteRequest write(PiPipeconf pipeconf) {
- return new WriteRequestImpl(this, pipeconf);
+ public WriteRequest write(long p4DeviceId, PiPipeconf pipeconf) {
+ return new WriteRequestImpl(this, p4DeviceId, pipeconf);
}
@Override
@@ -164,14 +177,14 @@
@Override
public void onError(Throwable t) {
- if (Status.fromThrowable(t).getCode() ==
- Status.Code.FAILED_PRECONDITION) {
- // Pipeline not set but service is available.
- future.complete(true);
- } else {
- log.debug("", t);
- }
- future.complete(false);
+ log.debug("", t);
+ // FIXME: The P4Runtime spec is not explicit about error
+ // codes when a pipeline config is not set, which would
+ // be useful here as it's an indication that the
+ // service is available. As a workaround, we simply
+ // check the channel state.
+ future.complete(ConnectivityState.READY.equals(
+ channel.getState(false)));
}
@Override
@@ -179,6 +192,9 @@
// Ignore, unary call.
}
};
+ // Get any p4DeviceId under the control of this client or a default one.
+ final long p4DeviceId = streamClients.isEmpty() ? DEFAULT_P4_DEVICE_ID
+ : streamClients.keySet().iterator().next();
// Use long timeout as the device might return the full P4 blob
// (e.g. server does not support cookie), over a slow network.
execRpc(s -> s.getForwardingPipelineConfig(
@@ -207,15 +223,6 @@
}
/**
- * Returns the P4Runtime-internal device ID associated with this client.
- *
- * @return P4Runtime-internal device ID
- */
- long p4DeviceId() {
- return this.p4DeviceId;
- }
-
- /**
* Returns the ONOS device ID associated with this client.
*
* @return ONOS device ID
@@ -226,14 +233,20 @@
/**
* Returns the election ID last used in a MasterArbitrationUpdate message
- * sent by the client to the server. No guarantees are given that this is
- * the current election ID associated to the session, nor that the server
- * has acknowledged this value as valid.
+ * sent by the client to the server for the given P4Runtime-internal device
+ * ID. No guarantees are given that this is the current election ID
+ * associated to the session, nor that the server has acknowledged this
+ * value as valid.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @return election ID uint128 protobuf message
*/
- P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
- return streamClient.lastUsedElectionId();
+ P4RuntimeOuterClass.Uint128 lastUsedElectionId(long p4DeviceId) {
+ if (streamClients.containsKey(p4DeviceId)) {
+ return streamClients.get(p4DeviceId).lastUsedElectionId();
+ } else {
+ return P4RuntimeOuterClass.Uint128.getDefaultInstance();
+ }
}
/**
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
index b0b6c57..1dffc9c 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
@@ -63,9 +63,9 @@
@Override
public CompletableFuture<Boolean> setPipelineConfig(
- PiPipeconf pipeconf, ByteBuffer deviceData) {
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
- if (!client.isSessionOpen()) {
+ if (!client.isSessionOpen(p4DeviceId)) {
log.warn("Dropping set pipeline config request for {}, session is CLOSED",
client.deviceId());
return completedFuture(false);
@@ -86,8 +86,8 @@
final SetForwardingPipelineConfigRequest requestMsg =
SetForwardingPipelineConfigRequest
.newBuilder()
- .setDeviceId(client.p4DeviceId())
- .setElectionId(client.lastUsedElectionId())
+ .setDeviceId(p4DeviceId)
+ .setElectionId(client.lastUsedElectionId(p4DeviceId))
.setAction(VERIFY_AND_COMMIT)
.setConfig(pipelineConfigMsg)
.build();
@@ -159,15 +159,15 @@
@Override
public CompletableFuture<Boolean> isPipelineConfigSet(
- PiPipeconf pipeconf, ByteBuffer expectedDeviceData) {
- return getPipelineCookieFromServer()
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer expectedDeviceData) {
+ return getPipelineCookieFromServer(p4DeviceId)
.thenApply(cfgFromDevice -> comparePipelineConfig(
pipeconf, expectedDeviceData, cfgFromDevice));
}
@Override
- public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
- return getPipelineCookieFromServer().thenApply(Objects::nonNull);
+ public CompletableFuture<Boolean> isAnyPipelineConfigSet(long p4DeviceId) {
+ return getPipelineCookieFromServer(p4DeviceId).thenApply(Objects::nonNull);
}
private boolean comparePipelineConfig(
@@ -211,11 +211,12 @@
.equals(expectedCfg.getP4Info());
}
- private CompletableFuture<ForwardingPipelineConfig> getPipelineCookieFromServer() {
+ private CompletableFuture<ForwardingPipelineConfig> getPipelineCookieFromServer(
+ long p4DeviceId) {
final GetForwardingPipelineConfigRequest request =
GetForwardingPipelineConfigRequest
.newBuilder()
- .setDeviceId(client.p4DeviceId())
+ .setDeviceId(p4DeviceId)
.setResponseType(COOKIE_ONLY)
.build();
final CompletableFuture<ForwardingPipelineConfig> future = new CompletableFuture<>();
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java
index c85c7e8..29a7169 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java
@@ -51,11 +51,11 @@
private final PiPipeconf pipeconf;
private final P4RuntimeOuterClass.ReadRequest.Builder requestMsg;
- ReadRequestImpl(P4RuntimeClientImpl client, PiPipeconf pipeconf) {
+ ReadRequestImpl(P4RuntimeClientImpl client, long p4DeviceId, PiPipeconf pipeconf) {
this.client = client;
this.pipeconf = pipeconf;
this.requestMsg = P4RuntimeOuterClass.ReadRequest.newBuilder()
- .setDeviceId(client.p4DeviceId());
+ .setDeviceId(p4DeviceId);
}
@Override
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
index 11e91aa..a14f960 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -55,7 +55,8 @@
/**
* Implementation of P4RuntimeStreamClient. Handles P4Runtime StreamChannel RPC
- * operations, such as arbitration update and packet-in/out.
+ * operations, such as arbitration update and packet-in/out, for a given
+ * P4Runtime-internal device ID.
*/
public final class StreamClientImpl implements P4RuntimeStreamClient {
@@ -87,24 +88,27 @@
PiPipeconfService pipeconfService,
MasterElectionIdStore masterElectionIdStore,
P4RuntimeClientImpl client,
+ long p4DeviceId,
P4RuntimeControllerImpl controller) {
this.client = client;
this.deviceId = client.deviceId();
- this.p4DeviceId = client.p4DeviceId();
+ this.p4DeviceId = p4DeviceId;
this.pipeconfService = pipeconfService;
this.masterElectionIdStore = masterElectionIdStore;
this.controller = controller;
}
@Override
- public boolean isSessionOpen() {
+ public boolean isSessionOpen(long p4DeviceId) {
+ checkArgument(this.p4DeviceId == p4DeviceId);
return streamChannelManager.isOpen();
}
@Override
- public void closeSession() {
+ public void closeSession(long p4DeviceId) {
+ checkArgument(this.p4DeviceId == p4DeviceId);
synchronized (requestedToBeMaster) {
- this.masterElectionIdStore.unsetListener(deviceId);
+ this.masterElectionIdStore.unsetListener(deviceId, p4DeviceId);
streamChannelManager.teardown();
pendingElectionId = null;
requestedToBeMaster.set(false);
@@ -113,14 +117,17 @@
}
@Override
- public void setMastership(boolean master, BigInteger newElectionId) {
+ public void setMastership(long p4DeviceId, boolean master,
+ BigInteger newElectionId) {
+ checkArgument(this.p4DeviceId == p4DeviceId);
checkNotNull(newElectionId);
checkArgument(newElectionId.compareTo(BigInteger.ZERO) > 0,
"newElectionId must be a non zero positive number");
synchronized (requestedToBeMaster) {
requestedToBeMaster.set(master);
pendingElectionId = newElectionId;
- handlePendingElectionId(masterElectionIdStore.get(deviceId));
+ handlePendingElectionId(masterElectionIdStore.get(
+ deviceId, p4DeviceId));
}
}
@@ -151,7 +158,8 @@
> ARBITRATION_TIMEOUT_SECONDS * 1000;
}
if (timeoutExpired) {
- log.warn("{} arbitration timeout expired! Will send pending election ID now...",
+ log.warn("Arbitration timeout expired for {}! " +
+ "Will send pending election ID now...",
deviceId);
}
if (!timeoutExpired &&
@@ -163,35 +171,41 @@
deviceId, masterElectionId, pendingElectionId);
// Will try again as soon as the master election ID store is
// updated...
- masterElectionIdStore.setListener(deviceId, masterElectionIdListener);
+ masterElectionIdStore.setListener(
+ deviceId, p4DeviceId, masterElectionIdListener);
// ..or in ARBITRATION_RETRY_SECONDS at the latest (if we missed
// the store event).
pendingElectionIdRetryTask = SharedScheduledExecutors.newTimeout(
- () -> handlePendingElectionId(masterElectionIdStore.get(deviceId)),
+ () -> handlePendingElectionId(
+ masterElectionIdStore.get(deviceId, p4DeviceId)),
ARBITRATION_RETRY_SECONDS, TimeUnit.SECONDS);
} else {
// Send now.
log.info("Setting mastership on {}... " +
- "master={}, newElectionId={}, masterElectionId={}",
+ "master={}, newElectionId={}, " +
+ "masterElectionId={}, sessionOpen={}",
deviceId, requestedToBeMaster.get(),
- pendingElectionId, masterElectionId);
+ pendingElectionId, masterElectionId,
+ streamChannelManager.isOpen());
sendMasterArbitrationUpdate(pendingElectionId);
pendingElectionId = null;
pendingElectionIdTimestamp = 0;
// No need to listen for master election ID changes.
- masterElectionIdStore.unsetListener(deviceId);
+ masterElectionIdStore.unsetListener(deviceId, p4DeviceId);
}
}
}
@Override
- public boolean isMaster() {
+ public boolean isMaster(long p4DeviceId) {
+ checkArgument(this.p4DeviceId == p4DeviceId);
return isMaster.get();
}
@Override
- public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
- if (!isSessionOpen()) {
+ public void packetOut(long p4DeviceId, PiPacketOperation packet, PiPipeconf pipeconf) {
+ checkArgument(this.p4DeviceId == p4DeviceId);
+ if (!isSessionOpen(p4DeviceId)) {
log.warn("Dropping packet-out request for {}, session is closed",
deviceId);
return;
@@ -293,7 +307,7 @@
// and that otherwise would not be aware of changes, keeping their
// pending mastership operations forever.
final BigInteger masterElectionId = uint128ToBigInteger(msg.getElectionId());
- masterElectionIdStore.set(deviceId, masterElectionId);
+ masterElectionIdStore.set(deviceId, p4DeviceId, masterElectionId);
log.debug("Received arbitration update from {}: isMaster={}, masterElectionId={}",
deviceId, isMaster.get(), masterElectionId);
@@ -332,7 +346,7 @@
}
/**
- * A manager for the P4Runtime stream channel that opportunistically creates
+ * A manager for the P4Runtime StreamChannel RPC that opportunistically creates
* new stream RCP stubs (e.g. when one fails because of errors) and posts
* channel events via the P4Runtime controller.
*/
@@ -355,7 +369,7 @@
private void initIfRequired() {
if (requestObserver == null) {
- log.debug("Creating new stream channel for {}...", deviceId);
+ log.debug("Starting new StreamChannel RPC for {}...", deviceId);
open.set(false);
client.execRpcNoTimeout(
s -> requestObserver =
@@ -397,7 +411,7 @@
}
/**
- * Handles messages received from the device on the stream channel.
+ * Handles messages received from the device on the StreamChannel RPC.
*/
private final class InternalStreamResponseObserver
implements StreamObserver<StreamMessageResponse> {
@@ -429,7 +443,7 @@
deviceId, message.getUpdateCase());
}
} catch (Throwable ex) {
- log.error("Exception while processing stream message from {}",
+ log.error("Exception while processing StreamMessageResponse from {}",
deviceId, ex);
}
}
@@ -442,12 +456,12 @@
log.warn("{} is unreachable ({})",
deviceId, sre.getCause().getMessage());
} else {
- log.warn("Error on stream channel for {}: {}",
+ log.warn("Error on StreamChannel RPC for {}: {}",
deviceId, throwable.getMessage());
}
log.debug("", throwable);
} else {
- log.error(format("Exception on stream channel for %s",
+ log.error(format("Exception on StreamChannel RPC for %s",
deviceId), throwable);
}
streamChannelManager.teardown();
@@ -455,7 +469,7 @@
@Override
public void onCompleted() {
- log.warn("Stream channel for {} has completed", deviceId);
+ log.warn("StreamChannel RPC for {} has completed", deviceId);
streamChannelManager.teardown();
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
index fb3a5fb..481cf04 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
@@ -60,11 +60,11 @@
// set once we receive a response from the device.
private final WriteResponseImpl.Builder responseBuilder;
- WriteRequestImpl(P4RuntimeClientImpl client, PiPipeconf pipeconf) {
+ WriteRequestImpl(P4RuntimeClientImpl client, long p4DeviceId, PiPipeconf pipeconf) {
this.client = checkNotNull(client);
this.pipeconf = checkNotNull(pipeconf);
this.requestMsg = P4RuntimeOuterClass.WriteRequest.newBuilder()
- .setDeviceId(client.p4DeviceId());
+ .setDeviceId(p4DeviceId);
this.responseBuilder = WriteResponseImpl.builder(client.deviceId());
}
@@ -160,7 +160,8 @@
checkState(!submitted.getAndSet(true),
"Request has already been submitted, cannot submit again");
final P4RuntimeOuterClass.WriteRequest writeRequest = requestMsg
- .setElectionId(client.lastUsedElectionId())
+ .setElectionId(client.lastUsedElectionId(
+ requestMsg.getDeviceId()))
.build();
log.debug("Sending write request to {} with {} updates...",
client.deviceId(), writeRequest.getUpdatesCount());
@@ -186,11 +187,13 @@
future.complete(responseBuilder.setSuccessAllAndBuild());
}
}
+
@Override
public void onError(Throwable t) {
client.handleRpcError(t, "WRITE");
future.complete(responseBuilder.setErrorsAndBuild(t));
}
+
@Override
public void onCompleted() {
// Nothing to do, unary call.
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
index dff2330..04e39f7 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
@@ -17,6 +17,7 @@
package org.onosproject.p4runtime.ctl.controller;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.KryoNamespace;
import org.onosproject.net.DeviceId;
import org.onosproject.store.serializers.KryoNamespaces;
@@ -49,64 +50,77 @@
private static final KryoNamespace SERIALIZER = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
+ .register(Pair.class)
+ .register(Long.class)
.register(BigInteger.class)
.build();
private final Logger log = getLogger(getClass());
- private final EventuallyConsistentMapListener<DeviceId, BigInteger> mapListener =
+ private final EventuallyConsistentMapListener<Pair<DeviceId, Long>, BigInteger> mapListener =
new InternalMapListener();
- private EventuallyConsistentMap<DeviceId, BigInteger> masterElectionIds;
- private ConcurrentMap<DeviceId, MasterElectionIdListener> listeners =
+ private EventuallyConsistentMap<Pair<DeviceId, Long>, BigInteger> masterElectionIds;
+ private ConcurrentMap<Pair<DeviceId, Long>, MasterElectionIdListener> listeners =
Maps.newConcurrentMap();
@Activate
public void activate() {
- this.listeners = Maps.newConcurrentMap();
- this.masterElectionIds = storageService.<DeviceId, BigInteger>eventuallyConsistentMapBuilder()
+ listeners = Maps.newConcurrentMap();
+ masterElectionIds = storageService.<Pair<DeviceId, Long>,
+ BigInteger>eventuallyConsistentMapBuilder()
.withName("p4runtime-master-election-ids")
.withSerializer(SERIALIZER)
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
- this.masterElectionIds.addListener(mapListener);
+ masterElectionIds.addListener(mapListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
- this.masterElectionIds.removeListener(mapListener);
- this.masterElectionIds.destroy();
- this.masterElectionIds = null;
- this.listeners.clear();
- this.listeners = null;
+ masterElectionIds.removeListener(mapListener);
+ masterElectionIds.destroy();
+ masterElectionIds = null;
+ listeners.clear();
+ listeners = null;
log.info("Stopped");
}
@Override
- public void set(DeviceId deviceId, BigInteger electionId) {
+ public void set(DeviceId deviceId, long p4DeviceId, BigInteger electionId) {
checkNotNull(deviceId);
checkNotNull(electionId);
- this.masterElectionIds.put(deviceId, electionId);
+ masterElectionIds.put(Pair.of(deviceId, p4DeviceId), electionId);
}
@Override
- public BigInteger get(DeviceId deviceId) {
+ public BigInteger get(DeviceId deviceId, long p4DeviceId) {
checkNotNull(deviceId);
- return this.masterElectionIds.get(deviceId);
+ return masterElectionIds.get(Pair.of(deviceId, p4DeviceId));
}
@Override
- public void remove(DeviceId deviceId) {
+ public void remove(DeviceId deviceId, long p4DeviceId) {
checkNotNull(deviceId);
- this.masterElectionIds.remove(deviceId);
+ masterElectionIds.remove(Pair.of(deviceId, p4DeviceId));
}
@Override
- public void setListener(DeviceId deviceId, MasterElectionIdListener newListener) {
+ public void removeAll(DeviceId deviceId) {
+ masterElectionIds.keySet().forEach(k -> {
+ if (k.getLeft().equals(deviceId)) {
+ masterElectionIds.remove(k);
+ }
+ });
+ }
+
+ @Override
+ public void setListener(DeviceId deviceId, long p4DeviceId,
+ MasterElectionIdListener newListener) {
checkNotNull(deviceId);
checkNotNull(newListener);
- listeners.compute(deviceId, (did, existingListener) -> {
+ listeners.compute(Pair.of(deviceId, p4DeviceId), (x, existingListener) -> {
if (existingListener == null || existingListener == newListener) {
return newListener;
} else {
@@ -117,13 +131,13 @@
}
@Override
- public void unsetListener(DeviceId deviceId) {
- listeners.remove(deviceId);
+ public void unsetListener(DeviceId deviceId, long p4DeviceId) {
+ listeners.remove(Pair.of(deviceId, p4DeviceId));
}
- private class InternalMapListener implements EventuallyConsistentMapListener<DeviceId, BigInteger> {
+ private class InternalMapListener implements EventuallyConsistentMapListener<Pair<DeviceId, Long>, BigInteger> {
@Override
- public void event(EventuallyConsistentMapEvent<DeviceId, BigInteger> event) {
+ public void event(EventuallyConsistentMapEvent<Pair<DeviceId, Long>, BigInteger> event) {
final MasterElectionIdListener listener = listeners.get(event.key());
if (listener == null) {
return;
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
index f5393d1..7e5a49f 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
@@ -21,51 +21,71 @@
import java.math.BigInteger;
/**
- * Store that keeps track of master election IDs for each device.
+ * Store that keeps track of master election IDs for each server (device) and
+ * P4Runtime-internal device ID.
*/
public interface MasterElectionIdStore {
/**
- * Sets the master election ID for the given device.
+ * Sets the master election ID for the given device and P4Runtime-internal
+ * device ID.
*
* @param deviceId device ID
+ * @param p4DeviceId P4Runtime-internal device ID
* @param electionId election ID
*/
- void set(DeviceId deviceId, BigInteger electionId);
+ void set(DeviceId deviceId, long p4DeviceId, BigInteger electionId);
/**
- * Returns the last known master election ID for the given device, or null.
+ * Returns the last known master election ID for the given device and
+ * P4Runtime-internal device ID, or null.
*
- * @param deviceId device ID
+ * @param deviceId device ID
+ * @param p4DeviceId P4Runtime-internal device ID
* @return election ID
*/
- BigInteger get(DeviceId deviceId);
+ BigInteger get(DeviceId deviceId, long p4DeviceId);
/**
- * Removes any state associated with the given device.
+ * Removes any state associated with the given device and P4Runtime-internal
+ * device ID.
+ *
+ * @param deviceId device ID
+ * @param p4DeviceId P4Runtime-internal device ID
+ */
+ void remove(DeviceId deviceId, long p4DeviceId);
+
+ /**
+ * Removes all state associated with the given device.
*
* @param deviceId device ID
*/
- void remove(DeviceId deviceId);
+ void removeAll(DeviceId deviceId);
/**
- * Sets a listener for the given device that will be invoked every time
- * there will be changes to the master election ID.
+ * Sets a listener for the given device and P4Runtime-internal device ID
+ * that will be invoked every time there will be changes to the master
+ * election ID.
*
- * @param deviceId device ID
- * @param listener listener
+ * @param deviceId device ID
+ * @param p4DeviceId P4Runtime-internal device ID
+ * @param listener listener
*/
- void setListener(DeviceId deviceId, MasterElectionIdListener listener);
+ void setListener(DeviceId deviceId, long p4DeviceId,
+ MasterElectionIdListener listener);
/**
- * Unset the listener for the given device.
+ * Unset the listener for the given device and P4Runtime-internal device
+ * ID.
*
- * @param deviceId device ID
+ * @param deviceId device ID
+ * @param p4DeviceId P4Runtime-internal device ID
*/
- void unsetListener(DeviceId deviceId);
+ void unsetListener(DeviceId deviceId, long p4DeviceId);
/**
- * Listener of master election ID changes for a specific device.
+ * Listener of master election ID changes for a specific device and
+ * P4Runtime-internal device ID.
*/
interface MasterElectionIdListener {
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
index 2256d5e..c188ee4 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
@@ -21,7 +21,6 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
-import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.api.P4RuntimeEventListener;
@@ -36,7 +35,7 @@
@Component(immediate = true, service = P4RuntimeController.class)
public class P4RuntimeControllerImpl
extends AbstractGrpcClientController
- <P4RuntimeClientKey, P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
+ <P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
implements P4RuntimeController {
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -46,27 +45,21 @@
private MasterElectionIdStore masterElectionIdStore;
public P4RuntimeControllerImpl() {
- super(P4RuntimeEvent.class);
+ super(P4RuntimeEvent.class, "P4Runtime");
}
@Override
- public void removeClient(DeviceId deviceId) {
- super.removeClient(deviceId);
+ public void remove(DeviceId deviceId) {
+ super.remove(deviceId);
// Assuming that when a client is removed, it is done so by all nodes,
// this is the best place to clear master election ID state.
- masterElectionIdStore.remove(deviceId);
- }
-
- @Override
- public void removeClient(P4RuntimeClientKey clientKey) {
- super.removeClient(clientKey);
- masterElectionIdStore.remove(clientKey.deviceId());
+ masterElectionIdStore.removeAll(deviceId);
}
@Override
protected P4RuntimeClient createClientInstance(
- P4RuntimeClientKey clientKey, ManagedChannel channel) {
- return new P4RuntimeClientImpl(clientKey, channel, this,
+ DeviceId deviceId, ManagedChannel channel) {
+ return new P4RuntimeClientImpl(deviceId, channel, this,
pipeconfService, masterElectionIdStore);
}
}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
index 595e07d..596546b 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
@@ -24,27 +24,32 @@
public class MockMasterElectionIdStore implements MasterElectionIdStore {
@Override
- public void set(DeviceId deviceId, BigInteger electionId) {
+ public void set(DeviceId deviceId, long p4DeviceId, BigInteger electionId) {
}
@Override
- public BigInteger get(DeviceId deviceId) {
+ public BigInteger get(DeviceId deviceId, long p4DeviceId) {
return null;
}
@Override
- public void remove(DeviceId deviceId) {
+ public void remove(DeviceId deviceId, long p4DeviceId) {
}
@Override
- public void setListener(DeviceId deviceId, MasterElectionIdListener listener) {
+ public void removeAll(DeviceId deviceId) {
}
@Override
- public void unsetListener(DeviceId deviceId) {
+ public void setListener(DeviceId deviceId, long p4DeviceId, MasterElectionIdListener listener) {
+
+ }
+
+ @Override
+ public void unsetListener(DeviceId deviceId, long p4DeviceId) {
}
}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index 4c4dcdf..146f102 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -25,6 +25,7 @@
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.AbstractServerImplBuilder;
import org.easymock.EasyMock;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -44,7 +45,6 @@
import org.onosproject.net.pi.runtime.PiActionProfileGroupId;
import org.onosproject.net.pi.runtime.PiActionProfileMember;
import org.onosproject.net.pi.runtime.PiActionProfileMemberId;
-import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl;
import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
@@ -55,8 +55,6 @@
import p4.v1.P4RuntimeOuterClass.WriteRequest;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collection;
import java.util.List;
@@ -64,7 +62,6 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static java.lang.String.format;
import static org.easymock.EasyMock.niceMock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -109,8 +106,6 @@
private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
private static final long DEFAULT_TIMEOUT_TIME = 10;
private static final Uint128 DEFAULT_ELECTION_ID = Uint128.getDefaultInstance();
- private static final String P4R_IP = "127.0.0.1";
- private static final int P4R_PORT = 50010;
private org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl client;
private P4RuntimeControllerImpl controller;
@@ -160,20 +155,23 @@
@Before
- public void setup() throws URISyntaxException {
+ public void setup() {
controller = niceMock(org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl.class);
- P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, new URI(
- format("grpc://%s:%d?device_id=%d", P4R_IP, P4R_PORT, P4_DEVICE_ID)));
client = new P4RuntimeClientImpl(
- clientKey, grpcChannel, controller, new MockPipeconfService(),
+ DEVICE_ID, grpcChannel, controller, new MockPipeconfService(),
new MockMasterElectionIdStore());
}
+ @After
+ public void teardown() {
+ client.shutdown();
+ }
+
@Test
public void testInsertPiActionProfileGroup() throws Exception {
CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
- client.write(PIPECONF).insert(GROUP).submitSync();
- assertTrue(client.write(PIPECONF).insert(GROUP).submitSync().isSuccess());
+ client.write(P4_DEVICE_ID, PIPECONF).insert(GROUP).submitSync();
+ assertTrue(client.write(P4_DEVICE_ID, PIPECONF).insert(GROUP).submitSync().isSuccess());
complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);
WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
assertEquals(1, result.getDeviceId());
@@ -202,7 +200,7 @@
@Test
public void testInsertPiActionMembers() throws Exception {
CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
- assertTrue(client.write(PIPECONF).insert(GROUP_MEMBER_INSTANCES)
+ assertTrue(client.write(P4_DEVICE_ID, PIPECONF).insert(GROUP_MEMBER_INSTANCES)
.submitSync().isSuccess());
complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);
WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
@@ -251,7 +249,7 @@
p4RuntimeServerImpl.willReturnReadResult(responses);
CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
- Collection<PiActionProfileGroup> groups = client.read(PIPECONF)
+ Collection<PiActionProfileGroup> groups = client.read(P4_DEVICE_ID, PIPECONF)
.actionProfileGroups(ACT_PROF_ID)
.submitSync().all(PiActionProfileGroup.class);
complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);
@@ -300,7 +298,7 @@
p4RuntimeServerImpl.willReturnReadResult(responses);
CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
- Collection<PiActionProfileMember> piMembers = client.read(PIPECONF)
+ Collection<PiActionProfileMember> piMembers = client.read(P4_DEVICE_ID, PIPECONF)
.actionProfileMembers(ACT_PROF_ID).submitSync()
.all(PiActionProfileMember.class);
complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);