Cherry pick gNMI and Stratum related changes to this branch
Cherry picked commits:
20211 Update gNMI version and build script
20247 [ONOS-7829] Implement AbstractGrpcClient and AbstractGrpcClientControl
20233 [ONOS-7141][ONOS-7142] Add GnmiClient and GnmiController
20234 Refactor OpenConfig gNMI device description descovery
20260 [ONOS-7831] Implement GnmiHandshaker
20270 Add Stratum driver
Change-Id: I81ad8bce45251af5909cfcac0edbcfd11c8ebf1d
diff --git a/protocols/p4runtime/api/BUCK b/protocols/p4runtime/api/BUCK
index fd05763..c775aea 100644
--- a/protocols/p4runtime/api/BUCK
+++ b/protocols/p4runtime/api/BUCK
@@ -3,6 +3,7 @@
COMPILE_DEPS = [
'//lib:CORE_DEPS',
'//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
+ '//protocols/grpc/api:onos-protocols-grpc-api',
]
TEST_DEPS = [
diff --git a/protocols/p4runtime/api/BUILD b/protocols/p4runtime/api/BUILD
index 531d734..2fb15ba 100644
--- a/protocols/p4runtime/api/BUILD
+++ b/protocols/p4runtime/api/BUILD
@@ -1,4 +1,5 @@
COMPILE_DEPS = CORE_DEPS + [
+ "//protocols/grpc/api:onos-protocols-grpc-api",
"@io_grpc_grpc_java//core",
]
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
index f44c629..7a08668 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
@@ -17,6 +17,7 @@
package org.onosproject.p4runtime.api;
import com.google.common.annotations.Beta;
+import org.onosproject.grpc.api.GrpcClient;
import org.onosproject.net.pi.model.PiActionProfileId;
import org.onosproject.net.pi.model.PiCounterId;
import org.onosproject.net.pi.model.PiMeterId;
@@ -42,7 +43,7 @@
* Client to control a P4Runtime device.
*/
@Beta
-public interface P4RuntimeClient {
+public interface P4RuntimeClient extends GrpcClient {
/**
* Type of write operation.
@@ -70,15 +71,6 @@
boolean isStreamChannelOpen();
/**
- * Shutdowns the client by terminating any active RPC such as the Stream
- * one.
- *
- * @return a completable future to signal the completion of the shutdown
- * procedure
- */
- CompletableFuture<Void> shutdown();
-
- /**
* Sends a master arbitration update to the device with a new election ID
* that is guaranteed to be the highest value between all clients.
*
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
new file mode 100644
index 0000000..70bf33c
--- /dev/null
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.api;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.grpc.api.GrpcClientKey;
+import org.onosproject.net.DeviceId;
+
+import java.util.Objects;
+
+/**
+ * Key that uniquely identifies a P4Runtime client.
+ */
+@Beta
+public final class P4RuntimeClientKey extends GrpcClientKey {
+ private static final String P4R_SERVICE_NAME = "p4runtime";
+ private final long p4DeviceId;
+
+ /**
+ * Creates a new client key.
+ *
+ * @param deviceId ONOS device ID
+ * @param serverAddr P4Runtime server address
+ * @param serverPort P4Runtime server port
+ * @param p4DeviceId P4Runtime server-internal device ID
+ */
+ public P4RuntimeClientKey(DeviceId deviceId, String serverAddr,
+ int serverPort, long p4DeviceId) {
+ super(P4R_SERVICE_NAME, deviceId, serverAddr, serverPort);
+ this.p4DeviceId = p4DeviceId;
+ }
+
+ /**
+ * Returns the P4Runtime server-internal device ID.
+ *
+ * @return P4Runtime server-internal device ID
+ */
+ public long p4DeviceId() {
+ return p4DeviceId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ P4RuntimeClientKey that = (P4RuntimeClientKey) o;
+ return p4DeviceId == that.p4DeviceId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), p4DeviceId);
+ }
+
+ @Override
+ public String toString() {
+ return super.toStringHelper()
+ .add("p4DeviceId", p4DeviceId)
+ .toString();
+ }
+}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 159e313..0bc8ed6 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -18,6 +18,7 @@
import com.google.common.annotations.Beta;
import org.onosproject.event.ListenerService;
+import org.onosproject.grpc.api.GrpcClientController;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.provider.ProviderId;
@@ -27,73 +28,8 @@
*/
@Beta
public interface P4RuntimeController
- extends ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
-
- /**
- * Instantiates a new client to operate on a P4Runtime device identified by
- * the given information. As a result of this method, a {@link
- * P4RuntimeClient} can be later obtained by invoking {@link
- * #getClient(DeviceId)}. Returns true if the client was created and the
- * channel to the device is open, false otherwise.
- * <p>
- * Only one client can exist for the same device ID. Calls to this method
- * are idempotent for the same [device ID, address, port, p4DeviceId]
- * triplet, i.e. returns true if such client already exists but a new one is
- * not created. Throws an {@link IllegalStateException} if a client for
- * device ID already exists but for different [address, port, p4DeviceId].
- *
- * @param deviceId device identifier
- * @param serverAddr address of the P4Runtime server
- * @param serverPort port of the P4Runtime server
- * @param p4DeviceId P4Runtime-specific device identifier
- * @return true if the client was created and the channel to the device is
- * open
- * @throws IllegalStateException if a client already exists for this device
- * ID but for different [address, port,
- * p4DeviceId] triplet.
- */
- boolean createClient(DeviceId deviceId, String serverAddr, int serverPort,
- long p4DeviceId);
-
- /**
- * Returns a client to operate on the given device, or null if a client for
- * such device does not exist in this controller.
- *
- * @param deviceId device identifier
- * @return client instance or null
- */
- P4RuntimeClient getClient(DeviceId deviceId);
-
- /**
- * Removes the client for the given device. If no client exists for the
- * given device identifier, the result is a no-op.
- *
- * @param deviceId device identifier
- */
- void removeClient(DeviceId deviceId);
-
- /**
- * Returns true if a client exists for the given device identifier, false
- * otherwise.
- *
- * @param deviceId device identifier
- * @return true if client exists, false otherwise.
- */
- boolean hasClient(DeviceId deviceId);
-
- /**
- * Returns true if the P4Runtime server running on the given device is
- * reachable, i.e. the channel is open and the server is able to respond to
- * RPCs, false otherwise. Reachability can be tested only if a client was
- * previously created using {@link #createClient(DeviceId, String, int,
- * long)}, otherwise this method returns false.
- *
- * @param deviceId device identifier.
- * @return true if a client was created and is able to contact the P4Runtime
- * server, false otherwise.
- */
- boolean isReachable(DeviceId deviceId);
-
+ extends GrpcClientController<P4RuntimeClientKey, P4RuntimeClient>,
+ ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
/**
* Adds a listener for device agent events for the given provider.
*
diff --git a/protocols/p4runtime/ctl/BUILD b/protocols/p4runtime/ctl/BUILD
index d6c0c7a..66aab43 100644
--- a/protocols/p4runtime/ctl/BUILD
+++ b/protocols/p4runtime/ctl/BUILD
@@ -1,6 +1,7 @@
COMPILE_DEPS = CORE_DEPS + KRYO + [
"//core/store/serializers:onos-core-serializers",
"//protocols/grpc/api:onos-protocols-grpc-api",
+ "//protocols/grpc/ctl:onos-protocols-grpc-ctl",
"//protocols/p4runtime/api:onos-protocols-p4runtime-api",
"//protocols/p4runtime/proto:onos-protocols-p4runtime-proto",
"@com_google_protobuf//:protobuf_java",
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java
deleted file mode 100644
index cc4bed0..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import com.google.common.base.MoreObjects;
-import org.onosproject.net.DeviceId;
-
-import java.util.Objects;
-
-/**
- * Key that uniquely identifies a P4Runtime client.
- */
-final class ClientKey {
-
- private final DeviceId deviceId;
- private final String serverAddr;
- private final int serverPort;
- private final long p4DeviceId;
-
- /**
- * Creates a new client key.
- *
- * @param deviceId ONOS device ID
- * @param serverAddr P4Runtime server address
- * @param serverPort P4Runtime server port
- * @param p4DeviceId P4Runtime server-internal device ID
- */
- ClientKey(DeviceId deviceId, String serverAddr, int serverPort, long p4DeviceId) {
- this.deviceId = deviceId;
- this.serverAddr = serverAddr;
- this.serverPort = serverPort;
- this.p4DeviceId = p4DeviceId;
- }
-
- /**
- * Returns the device ID.
- *
- * @return device ID.
- */
- public DeviceId deviceId() {
- return deviceId;
- }
-
- /**
- * Returns the P4Runtime server address.
- *
- * @return P4Runtime server address
- */
- public String serverAddr() {
- return serverAddr;
- }
-
- /**
- * Returns the P4Runtime server port.
- *
- * @return P4Runtime server port
- */
- public int serverPort() {
- return serverPort;
- }
-
- /**
- * Returns the P4Runtime server-internal device ID.
- *
- * @return P4Runtime server-internal device ID
- */
- public long p4DeviceId() {
- return p4DeviceId;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(serverAddr, serverPort, p4DeviceId);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- final ClientKey other = (ClientKey) obj;
- return Objects.equals(this.serverAddr, other.serverAddr)
- && Objects.equals(this.serverPort, other.serverPort)
- && Objects.equals(this.p4DeviceId, other.p4DeviceId);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("deviceId", deviceId)
- .add("serverAddr", serverAddr)
- .add("serverPort", serverPort)
- .add("p4DeviceId", p4DeviceId)
- .toString();
- }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index d806233..d5080ae 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -23,7 +23,6 @@
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
-import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
@@ -32,9 +31,8 @@
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.onlab.osgi.DefaultServiceDirectory;
-import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
-import org.onosproject.net.DeviceId;
+import org.onosproject.grpc.ctl.AbstractGrpcClient;
import org.onosproject.net.pi.model.PiActionProfileId;
import org.onosproject.net.pi.model.PiCounterId;
import org.onosproject.net.pi.model.PiMeterId;
@@ -52,8 +50,8 @@
import org.onosproject.net.pi.runtime.PiTableEntry;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
-import org.slf4j.Logger;
import p4.config.v1.P4InfoOuterClass.P4Info;
import p4.tmp.P4Config;
import p4.v1.P4RuntimeGrpc;
@@ -87,22 +85,13 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.PACKET_REPLICATION_ENGINE_ENTRY;
@@ -115,10 +104,7 @@
/**
* Implementation of a P4Runtime client.
*/
-final class P4RuntimeClientImpl implements P4RuntimeClient {
-
- // Timeout in seconds to obtain the request lock.
- private static final int LOCK_TIMEOUT = 60;
+final class P4RuntimeClientImpl extends AbstractGrpcClient implements P4RuntimeClient {
private static final Metadata.Key<com.google.rpc.Status> STATUS_DETAILS_KEY =
Metadata.Key.of("grpc-status-details-bin",
@@ -132,18 +118,9 @@
WriteOperationType.DELETE, Update.Type.DELETE
);
- private final Logger log = getLogger(getClass());
-
- private final Lock requestLock = new ReentrantLock();
- private final Context.CancellableContext cancellableContext =
- Context.current().withCancellation();
-
- private final DeviceId deviceId;
private final long p4DeviceId;
private final P4RuntimeControllerImpl controller;
private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
- private final ExecutorService executorService;
- private final Executor contextExecutor;
private StreamChannelManager streamChannelManager;
// Used by this client for write requests.
@@ -154,70 +131,22 @@
/**
* Default constructor.
*
- * @param deviceId the ONOS device id
- * @param p4DeviceId the P4 device id
+ * @param clientKey the client key of this client
* @param channel gRPC channel
* @param controller runtime client controller
*/
- P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
+ P4RuntimeClientImpl(P4RuntimeClientKey clientKey, ManagedChannel channel,
P4RuntimeControllerImpl controller) {
- this.deviceId = deviceId;
- this.p4DeviceId = p4DeviceId;
+
+ super(clientKey);
+ this.p4DeviceId = clientKey.p4DeviceId();
this.controller = controller;
- this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
- "onos-p4runtime-client-" + deviceId.toString(), "%d"));
- this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+
//TODO Investigate use of stub deadlines instead of timeout in supplyInContext
this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
this.streamChannelManager = new StreamChannelManager(channel);
}
- /**
- * Submits a task for async execution via the given executor. All tasks
- * submitted with this method will be executed sequentially.
- */
- private <U> CompletableFuture<U> supplyWithExecutor(
- Supplier<U> supplier, String opDescription, Executor executor) {
- return CompletableFuture.supplyAsync(() -> {
- // TODO: explore a more relaxed locking strategy.
- try {
- if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
- log.error("LOCK TIMEOUT! This is likely a deadlock, "
- + "please debug (executing {})",
- opDescription);
- throw new IllegalThreadStateException("Lock timeout");
- }
- } catch (InterruptedException e) {
- log.warn("Thread interrupted while waiting for lock (executing {})",
- opDescription);
- throw new IllegalStateException(e);
- }
- try {
- return supplier.get();
- } catch (StatusRuntimeException ex) {
- log.warn("Unable to execute {} on {}: {}",
- opDescription, deviceId, ex.toString());
- throw ex;
- } catch (Throwable ex) {
- log.error("Exception in client of {}, executing {}",
- deviceId, opDescription, ex);
- throw ex;
- } finally {
- requestLock.unlock();
- }
- }, executor);
- }
-
- /**
- * Equivalent of supplyWithExecutor using the gRPC context executor of this
- * client, such that if the context is cancelled (e.g. client shutdown) the
- * RPC is automatically cancelled.
- */
- private <U> CompletableFuture<U> supplyInContext(
- Supplier<U> supplier, String opDescription) {
- return supplyWithExecutor(supplier, opDescription, contextExecutor);
- }
-
@Override
public CompletableFuture<Boolean> startStreamChannel() {
return supplyInContext(() -> sendMasterArbitrationUpdate(false),
@@ -225,12 +154,6 @@
}
@Override
- public CompletableFuture<Void> shutdown() {
- return supplyWithExecutor(this::doShutdown, "shutdown",
- SharedExecutors.getPoolThreadExecutor());
- }
-
- @Override
public CompletableFuture<Boolean> becomeMaster() {
return supplyInContext(() -> sendMasterArbitrationUpdate(true),
"becomeMaster");
@@ -1202,19 +1125,9 @@
.build();
}
- private Void doShutdown() {
- log.debug("Shutting down client for {}...", deviceId);
+ protected Void doShutdown() {
streamChannelManager.complete();
- cancellableContext.cancel(new InterruptedException(
- "Requested client shutdown"));
- this.executorService.shutdownNow();
- try {
- executorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Executor service didn't shutdown in time.");
- Thread.currentThread().interrupt();
- }
- return null;
+ return super.doShutdown();
}
// Returns the collection of succesfully write entities.
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index a140491..abafaf5 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -19,36 +19,29 @@
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Striped;
import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.netty.NettyChannelBuilder;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.api.GrpcController;
+import org.onosproject.grpc.ctl.AbstractGrpcClientController;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceAgentEvent;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.api.P4RuntimeEventListener;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
-import java.io.IOException;
import java.math.BigInteger;
-import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
-import java.util.function.Supplier;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
@@ -58,19 +51,12 @@
@Component(immediate = true)
@Service
public class P4RuntimeControllerImpl
- extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
+ extends AbstractGrpcClientController
+ <P4RuntimeClientKey, P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
implements P4RuntimeController {
- // Getting the pipeline config from the device can take tens of MBs.
- private static final int MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
- private static final int MEGABYTES = 1024 * 1024;
-
private final Logger log = getLogger(getClass());
- private final Map<DeviceId, ClientKey> clientKeys = Maps.newHashMap();
- private final Map<ClientKey, P4RuntimeClient> clients = Maps.newHashMap();
- private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
-
private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
deviceAgentListeners = Maps.newConcurrentMap();
private final Striped<Lock> stripedLocks = Striped.lock(30);
@@ -78,151 +64,28 @@
private DistributedElectionIdGenerator electionIdGenerator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private GrpcController grpcController;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private StorageService storageService;
@Activate
public void activate() {
+ super.activate();
eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
electionIdGenerator = new DistributedElectionIdGenerator(storageService);
log.info("Started");
}
-
@Deactivate
public void deactivate() {
- clientKeys.keySet().forEach(this::removeClient);
- clientKeys.clear();
- clients.clear();
- channelIds.clear();
+ super.deactivate();
deviceAgentListeners.clear();
- grpcController = null;
electionIdGenerator.destroy();
electionIdGenerator = null;
- eventDispatcher.removeSink(P4RuntimeEvent.class);
log.info("Stopped");
}
@Override
- public boolean createClient(DeviceId deviceId, String serverAddr,
- int serverPort, long p4DeviceId) {
- checkNotNull(deviceId);
- checkNotNull(serverAddr);
- checkArgument(serverPort > 0, "Invalid server port");
-
- return withDeviceLock(() -> doCreateClient(
- deviceId, serverAddr, serverPort, p4DeviceId), deviceId);
- }
-
- private boolean doCreateClient(DeviceId deviceId, String serverAddr,
- int serverPort, long p4DeviceId) {
-
- ClientKey clientKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
-
- if (clientKeys.containsKey(deviceId)) {
- final ClientKey existingKey = clientKeys.get(deviceId);
- if (clientKey.equals(existingKey)) {
- log.debug("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
- deviceId, serverAddr, serverPort, p4DeviceId);
- return true;
- } else {
- log.info("Requested client for {} with new " +
- "endpoint, removing old client (server={}:{}, " +
- "p4DeviceId={})...",
- deviceId, existingKey.serverAddr(),
- existingKey.serverPort(), existingKey.p4DeviceId());
- doRemoveClient(deviceId);
- }
- }
-
- log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
- deviceId, serverAddr, serverPort, p4DeviceId);
-
- GrpcChannelId channelId = GrpcChannelId.of(
- clientKey.deviceId(), "p4runtime-" + clientKey);
-
- ManagedChannelBuilder channelBuilder = NettyChannelBuilder
- .forAddress(serverAddr, serverPort)
- .maxInboundMessageSize(MAX_INBOUND_MSG_SIZE * MEGABYTES)
- .usePlaintext();
-
- ManagedChannel channel;
- try {
- channel = grpcController.connectChannel(channelId, channelBuilder);
- } catch (IOException e) {
- log.warn("Unable to connect to gRPC server of {}: {}",
- clientKey.deviceId(), e.getMessage());
- return false;
- }
-
- P4RuntimeClient client = new P4RuntimeClientImpl(
- clientKey.deviceId(), clientKey.p4DeviceId(), channel, this);
-
- clientKeys.put(clientKey.deviceId(), clientKey);
- clients.put(clientKey, client);
- channelIds.put(clientKey.deviceId(), channelId);
-
- return true;
- }
-
- @Override
- public P4RuntimeClient getClient(DeviceId deviceId) {
- if (deviceId == null) {
- return null;
- }
- return withDeviceLock(() -> doGetClient(deviceId), deviceId);
- }
-
- private P4RuntimeClient doGetClient(DeviceId deviceId) {
- if (!clientKeys.containsKey(deviceId)) {
- return null;
- } else {
- return clients.get(clientKeys.get(deviceId));
- }
- }
-
- @Override
- public void removeClient(DeviceId deviceId) {
- if (deviceId == null) {
- return;
- }
- withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
- }
-
- private Void doRemoveClient(DeviceId deviceId) {
- if (clientKeys.containsKey(deviceId)) {
- final ClientKey clientKey = clientKeys.get(deviceId);
- clients.get(clientKey).shutdown();
- grpcController.disconnectChannel(channelIds.get(deviceId));
- clientKeys.remove(deviceId);
- clients.remove(clientKey);
- channelIds.remove(deviceId);
- }
- return null;
- }
-
- @Override
- public boolean hasClient(DeviceId deviceId) {
- return clientKeys.containsKey(deviceId);
- }
-
- @Override
- public boolean isReachable(DeviceId deviceId) {
- if (deviceId == null) {
- return false;
- }
- return withDeviceLock(() -> doIsReacheable(deviceId), deviceId);
- }
-
- private boolean doIsReacheable(DeviceId deviceId) {
- // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
- if (!clientKeys.containsKey(deviceId)) {
- log.debug("No client for {}, can't check for reachability", deviceId);
- return false;
- }
- return grpcController.isChannelOpen(channelIds.get(deviceId));
+ protected P4RuntimeClient createClientInstance(P4RuntimeClientKey clientKey, ManagedChannel channel) {
+ return new P4RuntimeClientImpl(clientKey, channel, this);
}
@Override
@@ -244,16 +107,6 @@
});
}
- private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
- final Lock lock = stripedLocks.get(deviceId);
- lock.lock();
- try {
- return task.get();
- } finally {
- lock.unlock();
- }
- }
-
BigInteger newMasterElectionId(DeviceId deviceId) {
return electionIdGenerator.generate(deviceId);
}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index 36e60ea..301d5e1 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -44,6 +44,7 @@
import org.onosproject.net.pi.runtime.PiActionGroupMember;
import org.onosproject.net.pi.runtime.PiActionGroupMemberId;
import org.onosproject.net.pi.runtime.PiActionParam;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
import p4.v1.P4RuntimeOuterClass.Entity;
@@ -100,6 +101,8 @@
private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
private static final long DEFAULT_TIMEOUT_TIME = 10;
private static final Uint128 DEFAULT_ELECTION_ID = Uint128.newBuilder().setLow(1).build();
+ private static final String P4R_IP = "127.0.0.1";
+ private static final int P4R_PORT = 50010;
private P4RuntimeClientImpl client;
private P4RuntimeControllerImpl controller;
@@ -152,9 +155,8 @@
@Before
public void setup() {
controller = niceMock(P4RuntimeControllerImpl.class);
- client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID,
- grpcChannel,
- controller);
+ P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, P4R_IP, P4R_PORT, P4_DEVICE_ID);
+ client = new P4RuntimeClientImpl(clientKey, grpcChannel, controller);
client.becomeMaster();
}