Allow sharing the same gRPC channel between clients
This change introduces a refactoring of the gRPC protocol subsystem that
allows the creation of a gRPC chanel independently of the client, while
allowing multiple clients to share the same channel (e.g. as in Stratum
where we use 3 clients).
Moreover, we refactor the P4RuntimeClient API to support multiple
P4Runtime-internal device ID using the same client. While before the
client was associated to one of such ID.
Finally, we provide an abstract implementation for gRPC-based driver
behaviors, reducing code duplication in P4Runtime, gNMI and gNOI drivers.
Change-Id: I1a46352bbbef1e0d24042f169ae8ba580202944f
diff --git a/apps/onlp-demo/BUILD b/apps/onlp-demo/BUILD
index 0bf3560..960a820 100644
--- a/apps/onlp-demo/BUILD
+++ b/apps/onlp-demo/BUILD
@@ -7,7 +7,6 @@
"//protocols/gnmi/stub:onos-protocols-gnmi-stub",
"//protocols/gnmi/api:onos-protocols-gnmi-api",
"//protocols/grpc/api:onos-protocols-grpc-api",
- "//protocols/grpc/proto:onos-protocols-grpc-proto",
]
osgi_jar_with_tests(
diff --git a/apps/onlp-demo/src/main/java/org/onosproject/onlpdemo/OnlpDemoManager.java b/apps/onlp-demo/src/main/java/org/onosproject/onlpdemo/OnlpDemoManager.java
index 6085bbc..60d2a09 100644
--- a/apps/onlp-demo/src/main/java/org/onosproject/onlpdemo/OnlpDemoManager.java
+++ b/apps/onlp-demo/src/main/java/org/onosproject/onlpdemo/OnlpDemoManager.java
@@ -152,7 +152,7 @@
private void fetchData(DeviceId deviceId) {
ImmutableList.Builder<OnlpData> builder = ImmutableList.builder();
- GnmiClient gnmiClient = gnmiController.getClient(deviceId);
+ GnmiClient gnmiClient = gnmiController.get(deviceId);
deviceService.getPorts(deviceId)
.forEach(port -> builder.add(getOnlpData(gnmiClient, port)));
cache.put(deviceId, builder.build());
@@ -193,4 +193,4 @@
}
}
-}
\ No newline at end of file
+}
diff --git a/apps/p4-tutorial/pipeconf/src/main/java/org/onosproject/p4tutorial/pipeconf/PortStatisticsDiscoveryImpl.java b/apps/p4-tutorial/pipeconf/src/main/java/org/onosproject/p4tutorial/pipeconf/PortStatisticsDiscoveryImpl.java
index 48e7b13..4115627 100644
--- a/apps/p4-tutorial/pipeconf/src/main/java/org/onosproject/p4tutorial/pipeconf/PortStatisticsDiscoveryImpl.java
+++ b/apps/p4-tutorial/pipeconf/src/main/java/org/onosproject/p4tutorial/pipeconf/PortStatisticsDiscoveryImpl.java
@@ -44,13 +44,15 @@
import static org.onosproject.net.pi.model.PiCounterType.INDIRECT;
/**
- * Implementation of the PortStatisticsDiscovery behaviour for the mytunnel.p4 program. This behaviour works by using a
- * P4Runtime client to read the values of the ingress/egress port counters defined in the P4 program.
+ * Implementation of the PortStatisticsDiscovery behaviour for the mytunnel.p4
+ * program. This behaviour works by using a P4Runtime client to read the values
+ * of the ingress/egress port counters defined in the P4 program.
*/
public final class PortStatisticsDiscoveryImpl extends AbstractHandlerBehaviour implements PortStatisticsDiscovery {
private static final Logger log = LoggerFactory.getLogger(PortStatisticsDiscoveryImpl.class);
+ private static final long DEFAULT_P4_DEVICE_ID = 1;
private static final PiCounterId INGRESS_COUNTER_ID = PiCounterId.of("c_ingress.rx_port_counter");
private static final PiCounterId EGRESS_COUNTER_ID = PiCounterId.of("c_ingress.tx_port_counter");
@@ -62,7 +64,7 @@
// Get a client for this device.
P4RuntimeController controller = handler().get(P4RuntimeController.class);
- P4RuntimeClient client = controller.getClient(deviceId);
+ P4RuntimeClient client = controller.get(deviceId);
if (client == null) {
log.warn("Unable to find client for {}, aborting operation", deviceId);
return Collections.emptyList();
@@ -98,9 +100,10 @@
.collect(Collectors.toSet());
// Query the device.
- Collection<PiCounterCell> counterEntryResponse = client.read(pipeconf)
- .handles(counterCellHandles).submitSync()
- .all(PiCounterCell.class);
+ Collection<PiCounterCell> counterEntryResponse = client.read(
+ DEFAULT_P4_DEVICE_ID, pipeconf)
+ .handles(counterCellHandles).submitSync()
+ .all(PiCounterCell.class);
// Process response.
counterEntryResponse.forEach(counterCell -> {
diff --git a/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java b/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
index 433be71..ffb7deb 100644
--- a/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
+++ b/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
@@ -17,8 +17,6 @@
import com.google.common.annotations.Beta;
-import java.util.concurrent.CompletableFuture;
-
/**
* Abstraction of handler behaviour used to set-up and tear-down connections
* with a device. A connection is intended as the presence of state (e.g. a
@@ -31,22 +29,24 @@
/**
* Connects to the device, for example by opening the transport session that
* will be later used to send control messages. Returns true if the
- * connection was initiated successfully, false otherwise. The
- * implementation might require probing the device over the network to
- * initiate the connection.
+ * connection was created successfully, false otherwise.
+ * <p>
+ * The implementation should trigger without blocking any necessary
+ * handshake with the device to initialize the connection over the network,
+ * eventually generating a {@link org.onosproject.net.device.DeviceAgentEvent.Type#CHANNEL_OPEN}
+ * event when ready.
* <p>
* When calling this method while a connection to the device already exists,
* the behavior is not defined. For example, some implementations might
* require to first call {@link #disconnect()}, while other might behave as
* a no-op.
*
- * @return CompletableFuture eventually true if a connection was created
- * successfully, false otherwise
+ * @return true if a connection was created successfully, false otherwise
* @throws IllegalStateException if a connection already exists and the
* implementation requires to call {@link
* #disconnect()} first.
*/
- CompletableFuture<Boolean> connect() throws IllegalStateException;
+ boolean connect() throws IllegalStateException;
/**
* Returns true if a connection to the device exists, false otherwise. This
@@ -61,14 +61,19 @@
*
* @return true if the connection is open, false otherwise
*/
- boolean isConnected();
+ boolean hasConnection();
/**
- * Disconnects from the device, for example closing the transport session
+ * Disconnects from the device, for example closing any transport session
* previously opened.
* <p>
* Calling multiple times this method while a connection to the device is
* already closed should result in a no-op.
+ * <p>
+ * If a connection to the device existed and it was open, the implementation
+ * is expected to generate a
+ * {@link org.onosproject.net.device.DeviceAgentEvent.Type#CHANNEL_CLOSED}
+ * event.
*/
void disconnect();
}
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
index 0503d9a..e76d673 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
@@ -241,7 +241,7 @@
log.debug("Starting watchdog task for {} ({})", device.id(), pipeconf.id());
final PiPipelineProgrammable pipelineProg = device.as(PiPipelineProgrammable.class);
final DeviceHandshaker handshaker = device.as(DeviceHandshaker.class);
- if (!handshaker.isConnected()) {
+ if (!handshaker.hasConnection()) {
return false;
}
if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf))) {
diff --git a/drivers/gnmi/BUILD b/drivers/gnmi/BUILD
index efc3f08..f01a9f3 100644
--- a/drivers/gnmi/BUILD
+++ b/drivers/gnmi/BUILD
@@ -7,7 +7,7 @@
"//protocols/gnmi/stub:onos-protocols-gnmi-stub",
"//protocols/gnmi/api:onos-protocols-gnmi-api",
"//protocols/grpc/api:onos-protocols-grpc-api",
- "//protocols/grpc/proto:onos-protocols-grpc-proto",
+ "//protocols/grpc/utils:onos-protocols-grpc-utils",
]
BUNDLES = [
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java
deleted file mode 100644
index 72233fc..0000000
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.drivers.gnmi;
-
-import com.google.common.base.Strings;
-import org.onosproject.gnmi.api.GnmiClient;
-import org.onosproject.gnmi.api.GnmiClientKey;
-import org.onosproject.gnmi.api.GnmiController;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.config.NetworkConfigService;
-import org.onosproject.net.config.basics.BasicDeviceConfig;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-/**
- * Abstract implementation of a behaviour handler for a gNMI device.
- */
-public class AbstractGnmiHandlerBehaviour extends AbstractHandlerBehaviour {
-
- protected final Logger log = LoggerFactory.getLogger(getClass());
- protected DeviceId deviceId;
- protected DeviceService deviceService;
- protected GnmiClient client;
-
- protected boolean setupBehaviour(String opName) {
- deviceId = handler().data().deviceId();
- deviceService = handler().get(DeviceService.class);
- client = getClientByKey();
- if (client == null) {
- log.warn("Missing client for {}, aborting {}", deviceId, opName);
- return false;
- }
-
- return true;
- }
-
- GnmiClient getClientByKey() {
- final GnmiClientKey clientKey = clientKey();
- if (clientKey == null) {
- return null;
- }
- return handler().get(GnmiController.class).getClient(clientKey);
- }
-
- protected GnmiClientKey clientKey() {
- deviceId = handler().data().deviceId();
-
- final BasicDeviceConfig cfg = handler().get(NetworkConfigService.class)
- .getConfig(deviceId, BasicDeviceConfig.class);
- if (cfg == null || Strings.isNullOrEmpty(cfg.managementAddress())) {
- log.error("Missing or invalid config for {}, cannot derive " +
- "gNMI server endpoints", deviceId);
- return null;
- }
-
- try {
- return new GnmiClientKey(
- deviceId, new URI(cfg.managementAddress()));
- } catch (URISyntaxException e) {
- log.error("Management address of {} is not a valid URI: {}",
- deviceId, cfg.managementAddress());
- return null;
- }
- }
-}
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiHandshaker.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiHandshaker.java
index a6d0c04..82fc40c 100644
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiHandshaker.java
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiHandshaker.java
@@ -17,33 +17,23 @@
package org.onosproject.drivers.gnmi;
import org.onosproject.gnmi.api.GnmiClient;
-import org.onosproject.gnmi.api.GnmiClientKey;
import org.onosproject.gnmi.api.GnmiController;
+import org.onosproject.grpc.utils.AbstractGrpcHandshaker;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceHandshaker;
import java.util.concurrent.CompletableFuture;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-
/**
* Implementation of DeviceHandshaker for gNMI.
*/
-public class GnmiHandshaker extends AbstractGnmiHandlerBehaviour implements DeviceHandshaker {
+public class GnmiHandshaker
+ extends AbstractGrpcHandshaker<GnmiClient, GnmiController>
+ implements DeviceHandshaker {
- @Override
- public boolean isReachable() {
- final GnmiClient client = getClientByKey();
- return client != null && client.isServerReachable();
- }
- @Override
- public CompletableFuture<Boolean> probeReachability() {
- final GnmiClient client = getClientByKey();
- if (client == null) {
- return completedFuture(false);
- }
- return client.probeService();
+ public GnmiHandshaker() {
+ super(GnmiController.class);
}
@Override
@@ -65,33 +55,4 @@
public MastershipRole getRole() {
throw new UnsupportedOperationException("Mastership operation not supported");
}
-
- @Override
- public CompletableFuture<Boolean> connect() {
- return CompletableFuture.supplyAsync(this::createClient);
- }
-
- private boolean createClient() {
- GnmiClientKey clientKey = clientKey();
- if (clientKey == null) {
- return false;
- }
- if (!handler().get(GnmiController.class).createClient(clientKey)) {
- log.warn("Unable to create client for {}",
- handler().data().deviceId());
- return false;
- }
- return true;
- }
-
- @Override
- public boolean isConnected() {
- return getClientByKey() != null;
- }
-
- @Override
- public void disconnect() {
- handler().get(GnmiController.class)
- .removeClient(handler().data().deviceId());
- }
}
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiDeviceDescriptionDiscovery.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiDeviceDescriptionDiscovery.java
index a85cdfb..210a237 100644
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiDeviceDescriptionDiscovery.java
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiDeviceDescriptionDiscovery.java
@@ -23,6 +23,9 @@
import gnmi.Gnmi.GetRequest;
import gnmi.Gnmi.GetResponse;
import org.onlab.packet.ChassisId;
+import org.onosproject.gnmi.api.GnmiClient;
+import org.onosproject.gnmi.api.GnmiController;
+import org.onosproject.grpc.utils.AbstractGrpcHandlerBehaviour;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
@@ -48,7 +51,7 @@
* supports the gNMI protocol and Openconfig models.
*/
public class OpenConfigGnmiDeviceDescriptionDiscovery
- extends AbstractGnmiHandlerBehaviour
+ extends AbstractGrpcHandlerBehaviour<GnmiClient, GnmiController>
implements DeviceDescriptionDiscovery {
private static final Logger log = LoggerFactory
@@ -58,6 +61,10 @@
private static final String UNKNOWN = "unknown";
+ public OpenConfigGnmiDeviceDescriptionDiscovery() {
+ super(GnmiController.class);
+ }
+
@Override
public DeviceDescription discoverDeviceDetails() {
return new DefaultDeviceDescription(
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortAdminBehaviour.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortAdminBehaviour.java
index 4f050b0..666d5b1 100644
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortAdminBehaviour.java
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortAdminBehaviour.java
@@ -18,6 +18,8 @@
import gnmi.Gnmi;
import org.onosproject.gnmi.api.GnmiClient;
+import org.onosproject.gnmi.api.GnmiController;
+import org.onosproject.grpc.utils.AbstractGrpcHandlerBehaviour;
import org.onosproject.net.PortNumber;
import org.onosproject.net.behaviour.PortAdmin;
@@ -29,11 +31,18 @@
* Implementation of PortAdmin for gNMI devices with OpenConfig support.
*/
public class OpenConfigGnmiPortAdminBehaviour
- extends AbstractGnmiHandlerBehaviour
+ extends AbstractGrpcHandlerBehaviour<GnmiClient, GnmiController>
implements PortAdmin {
+ public OpenConfigGnmiPortAdminBehaviour() {
+ super(GnmiController.class);
+ }
+
@Override
public CompletableFuture<Boolean> enable(PortNumber number) {
+ if (!setupBehaviour("enable()")) {
+ return completedFuture(false);
+ }
doEnable(number, true);
// Always returning true is OK assuming this is used only by the
// GeneralDeviceProvider, which ignores the return value and instead
@@ -43,6 +52,9 @@
@Override
public CompletableFuture<Boolean> disable(PortNumber number) {
+ if (!setupBehaviour("disable()")) {
+ return completedFuture(false);
+ }
doEnable(number, false);
return completedFuture(true);
}
@@ -58,11 +70,6 @@
portNumber, deviceId);
return;
}
- final GnmiClient client = getClientByKey();
- if (client == null) {
- log.warn("Cannot update ports on {}, missing gNMI client", deviceId);
- return;
- }
final Gnmi.Path path = Gnmi.Path.newBuilder()
.addElem(Gnmi.PathElem.newBuilder().setName("interfaces").build())
.addElem(Gnmi.PathElem.newBuilder().setName("interface")
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortStatisticsDiscovery.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortStatisticsDiscovery.java
index fa7231f..6e361ff 100644
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortStatisticsDiscovery.java
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortStatisticsDiscovery.java
@@ -23,6 +23,9 @@
import gnmi.Gnmi.GetResponse;
import gnmi.Gnmi.Path;
import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.gnmi.api.GnmiClient;
+import org.onosproject.gnmi.api.GnmiController;
+import org.onosproject.grpc.utils.AbstractGrpcHandlerBehaviour;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
@@ -41,13 +44,18 @@
/**
* Behaviour to get port statistics from device via gNMI.
*/
-public class OpenConfigGnmiPortStatisticsDiscovery extends AbstractGnmiHandlerBehaviour
+public class OpenConfigGnmiPortStatisticsDiscovery
+ extends AbstractGrpcHandlerBehaviour<GnmiClient, GnmiController>
implements PortStatisticsDiscovery {
private static final Map<Pair<DeviceId, PortNumber>, Long> PORT_START_TIMES =
Maps.newConcurrentMap();
private static final String LAST_CHANGE = "last-changed";
+ public OpenConfigGnmiPortStatisticsDiscovery() {
+ super(GnmiController.class);
+ }
+
@Override
public Collection<PortStatistics> discoverPortStatistics() {
if (!setupBehaviour("discoverPortStatistics()")) {
diff --git a/drivers/gnoi/BUILD b/drivers/gnoi/BUILD
index 3a710a8..b5355a9 100644
--- a/drivers/gnoi/BUILD
+++ b/drivers/gnoi/BUILD
@@ -6,6 +6,7 @@
"//protocols/gnoi/stub:onos-protocols-gnoi-stub",
"//protocols/gnoi/api:onos-protocols-gnoi-api",
"//protocols/grpc/api:onos-protocols-grpc-api",
+ "//protocols/grpc/utils:onos-protocols-grpc-utils",
]
BUNDLES = [
diff --git a/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/AbstractGnoiHandlerBehaviour.java b/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/AbstractGnoiHandlerBehaviour.java
deleted file mode 100644
index 84043a8..0000000
--- a/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/AbstractGnoiHandlerBehaviour.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.drivers.gnoi;
-
-import com.google.common.base.Strings;
-import org.onosproject.gnoi.api.GnoiClient;
-import org.onosproject.gnoi.api.GnoiClientKey;
-import org.onosproject.gnoi.api.GnoiController;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.config.NetworkConfigService;
-import org.onosproject.net.config.basics.BasicDeviceConfig;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-/**
- * Abstract implementation of a behaviour handler for a gNOI device.
- */
-public class AbstractGnoiHandlerBehaviour extends AbstractHandlerBehaviour {
-
- protected final Logger log = LoggerFactory.getLogger(getClass());
- protected DeviceId deviceId;
- protected GnoiClient client;
-
- protected boolean setupBehaviour(String opName) {
- deviceId = handler().data().deviceId();
- client = getClientByKey();
-
- if (client == null) {
- log.warn("Missing client for {}, aborting {}", deviceId, opName);
- return false;
- }
-
- return true;
- }
-
- GnoiClient getClientByKey() {
- final GnoiClientKey clientKey = clientKey();
- if (clientKey == null) {
- return null;
- }
- return handler().get(GnoiController.class).getClient(clientKey);
- }
-
- protected GnoiClientKey clientKey() {
- deviceId = handler().data().deviceId();
-
- final BasicDeviceConfig cfg = handler().get(NetworkConfigService.class)
- .getConfig(deviceId, BasicDeviceConfig.class);
- if (cfg == null || Strings.isNullOrEmpty(cfg.managementAddress())) {
- log.error("Missing or invalid config for {}, cannot derive " +
- "gNOI server endpoints", deviceId);
- return null;
- }
-
- try {
- return new GnoiClientKey(
- deviceId, new URI(cfg.managementAddress()));
- } catch (URISyntaxException e) {
- log.error("Management address of {} is not a valid URI: {}",
- deviceId, cfg.managementAddress());
- return null;
- }
- }
-}
diff --git a/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiBasicSystemOperationsImpl.java b/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiBasicSystemOperationsImpl.java
index b979546..7876886 100644
--- a/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiBasicSystemOperationsImpl.java
+++ b/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiBasicSystemOperationsImpl.java
@@ -19,9 +19,10 @@
import gnoi.system.SystemOuterClass.RebootMethod;
import gnoi.system.SystemOuterClass.RebootRequest;
import gnoi.system.SystemOuterClass.RebootResponse;
+import org.onosproject.gnoi.api.GnoiClient;
+import org.onosproject.gnoi.api.GnoiController;
+import org.onosproject.grpc.utils.AbstractGrpcHandlerBehaviour;
import org.onosproject.net.behaviour.BasicSystemOperations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
@@ -30,10 +31,12 @@
* devices.
*/
public class GnoiBasicSystemOperationsImpl
- extends AbstractGnoiHandlerBehaviour implements BasicSystemOperations {
+ extends AbstractGrpcHandlerBehaviour<GnoiClient, GnoiController>
+ implements BasicSystemOperations {
- private static final Logger log = LoggerFactory
- .getLogger(GnoiBasicSystemOperationsImpl.class);
+ public GnoiBasicSystemOperationsImpl() {
+ super(GnoiController.class);
+ }
@Override
public CompletableFuture<Boolean> reboot() {
diff --git a/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiDeviceDescriptionDiscovery.java b/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiDeviceDescriptionDiscovery.java
index 3c308d6..2010c53 100644
--- a/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiDeviceDescriptionDiscovery.java
+++ b/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiDeviceDescriptionDiscovery.java
@@ -24,6 +24,7 @@
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
import java.util.Collections;
import java.util.List;
@@ -32,7 +33,8 @@
* Implementation of DeviceDescriptionDiscovery for gNOI devices.
*/
public class GnoiDeviceDescriptionDiscovery
- extends AbstractGnoiHandlerBehaviour implements DeviceDescriptionDiscovery {
+ extends AbstractHandlerBehaviour
+ implements DeviceDescriptionDiscovery {
private static final String UNKNOWN = "unknown";
diff --git a/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiHandshaker.java b/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiHandshaker.java
index 36a4971..1668bcc 100644
--- a/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiHandshaker.java
+++ b/drivers/gnoi/src/main/java/org/onosproject/drivers/gnoi/GnoiHandshaker.java
@@ -17,33 +17,22 @@
package org.onosproject.drivers.gnoi;
import org.onosproject.gnoi.api.GnoiClient;
-import org.onosproject.gnoi.api.GnoiClientKey;
import org.onosproject.gnoi.api.GnoiController;
+import org.onosproject.grpc.utils.AbstractGrpcHandshaker;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceHandshaker;
import java.util.concurrent.CompletableFuture;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-
/**
* Implementation of DeviceHandshaker for gNOI.
*/
-public class GnoiHandshaker extends AbstractGnoiHandlerBehaviour implements DeviceHandshaker {
+public class GnoiHandshaker
+ extends AbstractGrpcHandshaker<GnoiClient, GnoiController>
+ implements DeviceHandshaker {
- @Override
- public boolean isReachable() {
- final GnoiClient client = getClientByKey();
- return client != null && client.isServerReachable();
- }
-
- @Override
- public CompletableFuture<Boolean> probeReachability() {
- final GnoiClient client = getClientByKey();
- if (client == null) {
- return completedFuture(false);
- }
- return client.probeService();
+ public GnoiHandshaker() {
+ super(GnoiController.class);
}
@Override
@@ -65,33 +54,4 @@
public MastershipRole getRole() {
throw new UnsupportedOperationException("Mastership operation not supported");
}
-
- @Override
- public CompletableFuture<Boolean> connect() {
- return CompletableFuture.supplyAsync(this::createClient);
- }
-
- private boolean createClient() {
- GnoiClientKey clientKey = clientKey();
- if (clientKey == null) {
- return false;
- }
- if (!handler().get(GnoiController.class).createClient(clientKey)) {
- log.warn("Unable to create client for {}",
- handler().data().deviceId());
- return false;
- }
- return true;
- }
-
- @Override
- public boolean isConnected() {
- return getClientByKey() != null;
- }
-
- @Override
- public void disconnect() {
- handler().get(GnoiController.class)
- .removeClient(handler().data().deviceId());
- }
}
diff --git a/drivers/p4runtime/BUILD b/drivers/p4runtime/BUILD
index 7f917cd..f97dbd7 100644
--- a/drivers/p4runtime/BUILD
+++ b/drivers/p4runtime/BUILD
@@ -1,5 +1,7 @@
COMPILE_DEPS = CORE_DEPS + KRYO + [
"//core/store/serializers:onos-core-serializers",
+ "//protocols/grpc/api:onos-protocols-grpc-api",
+ "//protocols/grpc/utils:onos-protocols-grpc-utils",
"//protocols/p4runtime/api:onos-protocols-p4runtime-api",
"@io_grpc_grpc_java//core",
]
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
index c71907b..7bbc00b 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
@@ -16,40 +16,31 @@
package org.onosproject.drivers.p4runtime;
-import com.google.common.base.Strings;
-import org.onosproject.net.Device;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.config.NetworkConfigService;
-import org.onosproject.net.config.basics.BasicDeviceConfig;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.grpc.utils.AbstractGrpcHandlerBehaviour;
import org.onosproject.net.pi.model.PiPipeconf;
-import org.onosproject.net.pi.model.PiPipelineInterpreter;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.net.pi.service.PiTranslationService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
-import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.api.P4RuntimeController;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.net.URISyntaxException;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.drivers.p4runtime.P4RuntimeDriverUtils.extractP4DeviceId;
/**
* Abstract implementation of a behaviour handler for a P4Runtime device.
*/
-public class AbstractP4RuntimeHandlerBehaviour extends AbstractHandlerBehaviour {
-
- protected final Logger log = LoggerFactory.getLogger(getClass());
+public abstract class AbstractP4RuntimeHandlerBehaviour
+ extends AbstractGrpcHandlerBehaviour<P4RuntimeClient, P4RuntimeController> {
// Initialized by setupBehaviour()
- protected DeviceId deviceId;
+ protected Long p4DeviceId;
protected PiPipeconf pipeconf;
- protected P4RuntimeClient client;
- protected PiTranslationService translationService;
+ PiTranslationService translationService;
+
+
+ AbstractP4RuntimeHandlerBehaviour() {
+ super(P4RuntimeController.class);
+ }
/**
* Initializes this behaviour attributes. Returns true if the operation was
@@ -59,20 +50,25 @@
* @return true if successful, false otherwise
*/
protected boolean setupBehaviour(String opName) {
- deviceId = handler().data().deviceId();
-
- client = getClientByKey();
- if (client == null) {
- log.warn("Missing client for {}, aborting {}", deviceId, opName);
+ if (!super.setupBehaviour(opName)) {
return false;
}
- PiPipeconfService piPipeconfService = handler().get(PiPipeconfService.class);
- if (!piPipeconfService.getPipeconf(deviceId).isPresent()) {
+ p4DeviceId = extractP4DeviceId(mgmtUriFromNetcfg());
+ if (p4DeviceId == null) {
+ log.warn("Unable to obtain P4Runtime-internal device_id from " +
+ "config of {}, cannot perform {}",
+ deviceId, opName);
+ return false;
+ }
+
+ final PiPipeconfService pipeconfService = handler().get(
+ PiPipeconfService.class);
+ if (!pipeconfService.getPipeconf(deviceId).isPresent()) {
log.warn("Missing pipeconf for {}, cannot perform {}", deviceId, opName);
return false;
}
- pipeconf = piPipeconfService.getPipeconf(deviceId).get();
+ pipeconf = pipeconfService.getPipeconf(deviceId).get();
translationService = handler().get(PiTranslationService.class);
@@ -80,61 +76,6 @@
}
/**
- * Returns an instance of the interpreter implementation for this device,
- * null if an interpreter cannot be retrieved.
- *
- * @return interpreter or null
- */
- PiPipelineInterpreter getInterpreter() {
- final Device device = handler().get(DeviceService.class).getDevice(deviceId);
- if (device == null) {
- log.warn("Unable to find device {}, cannot get interpreter", deviceId);
- return null;
- }
- if (!device.is(PiPipelineInterpreter.class)) {
- log.warn("Unable to get interpreter for {}, missing behaviour",
- deviceId);
- return null;
- }
- return device.as(PiPipelineInterpreter.class);
- }
-
- /**
- * Returns a P4Runtime client previsouly created for this device, null if
- * such client does not exist.
- *
- * @return client or null
- */
- P4RuntimeClient getClientByKey() {
- final P4RuntimeClientKey clientKey = clientKey();
- if (clientKey == null) {
- return null;
- }
- return handler().get(P4RuntimeController.class).getClient(clientKey);
- }
-
- protected P4RuntimeClientKey clientKey() {
- deviceId = handler().data().deviceId();
-
- final BasicDeviceConfig cfg = handler().get(NetworkConfigService.class)
- .getConfig(deviceId, BasicDeviceConfig.class);
- if (cfg == null || Strings.isNullOrEmpty(cfg.managementAddress())) {
- log.error("Missing or invalid config for {}, cannot derive " +
- "P4Runtime server endpoints", deviceId);
- return null;
- }
-
- try {
- return new P4RuntimeClientKey(
- deviceId, new URI(cfg.managementAddress()));
- } catch (URISyntaxException e) {
- log.error("Management address of {} is not a valid URI: {}",
- deviceId, cfg.managementAddress());
- return null;
- }
- }
-
- /**
* Returns the value of the given driver property, if present, otherwise
* returns the given default value.
*
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
index 916a945..9b700ca 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
@@ -16,11 +16,8 @@
package org.onosproject.drivers.p4runtime;
-import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.PiPipelineProgrammable;
import org.onosproject.net.pi.model.PiPipeconf;
-import org.onosproject.p4runtime.api.P4RuntimeClient;
-import org.onosproject.p4runtime.api.P4RuntimeController;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
@@ -49,42 +46,32 @@
@Override
public CompletableFuture<Boolean> setPipeconf(PiPipeconf pipeconf) {
- DeviceId deviceId = handler().data().deviceId();
- P4RuntimeController controller = handler().get(P4RuntimeController.class);
-
- P4RuntimeClient client = controller.getClient(deviceId);
- if (client == null) {
- log.warn("Unable to find client for {}, aborting pipeconf deploy", deviceId);
+ if (!setupBehaviour("setPipeconf()")) {
return completedFuture(false);
}
- ByteBuffer deviceDataBuffer = createDeviceDataBuffer(pipeconf);
+ final ByteBuffer deviceDataBuffer = createDeviceDataBuffer(pipeconf);
if (deviceDataBuffer == null) {
// Hopefully the child class logged the problem.
return completedFuture(false);
}
- return client.setPipelineConfig(pipeconf, deviceDataBuffer);
+ return client.setPipelineConfig(p4DeviceId, pipeconf, deviceDataBuffer);
}
@Override
public CompletableFuture<Boolean> isPipeconfSet(PiPipeconf pipeconf) {
- DeviceId deviceId = handler().data().deviceId();
- P4RuntimeController controller = handler().get(P4RuntimeController.class);
-
- P4RuntimeClient client = controller.getClient(deviceId);
- if (client == null) {
- log.warn("Unable to find client for {}, cannot check if pipeconf is set", deviceId);
+ if (!setupBehaviour("isPipeconfSet()")) {
return completedFuture(false);
}
- ByteBuffer deviceDataBuffer = createDeviceDataBuffer(pipeconf);
+ final ByteBuffer deviceDataBuffer = createDeviceDataBuffer(pipeconf);
if (deviceDataBuffer == null) {
// Hopefully the child class logged the problem.
return completedFuture(false);
}
- return client.isPipelineConfigSet(pipeconf, deviceDataBuffer);
+ return client.isPipelineConfigSet(p4DeviceId, pipeconf, deviceDataBuffer);
}
@Override
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
index d14b917..612394f 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
@@ -131,7 +131,8 @@
}
// Dump groups and members from device for all action profiles.
- final P4RuntimeReadClient.ReadRequest request = client.read(pipeconf);
+ final P4RuntimeReadClient.ReadRequest request = client.read(
+ p4DeviceId, pipeconf);
pipeconf.pipelineModel().actionProfiles()
.stream().map(PiActionProfileModel::id)
.forEach(id -> request.actionProfileGroups(id)
@@ -184,7 +185,7 @@
log.warn("Cleaning up {} action profile groups and " +
"{} members on {}...",
groupHandlesToRemove.size(), memberHandlesToRemove.size(), deviceId);
- client.write(pipeconf)
+ client.write(p4DeviceId, pipeconf)
.delete(groupHandlesToRemove)
.delete(memberHandlesToRemove)
.submit().whenComplete((r, ex) -> {
@@ -244,7 +245,8 @@
// found on the device.
if (!validateGroupMembers(piGroupFromStore, membersOnDevice)) {
log.warn("Group on device {} refers to members that are different " +
- "than those found in translation store: {}", handle);
+ "than those found in translation store: {}",
+ deviceId, handle);
return null;
}
if (mirrorEntry == null) {
@@ -308,7 +310,7 @@
if (members == null) {
return;
}
- final WriteRequest request = client.write(pipeconf);
+ final WriteRequest request = client.write(p4DeviceId, pipeconf);
WRITE_LOCKS.get(deviceId).lock();
try {
if (operation == Operation.APPLY) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeDeviceDescriptionDiscovery.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeDeviceDescriptionDiscovery.java
index 337ce6a..4b3dba9 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeDeviceDescriptionDiscovery.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeDeviceDescriptionDiscovery.java
@@ -24,21 +24,25 @@
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
import org.onosproject.net.device.PortDescription;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
import java.util.Collections;
import java.util.List;
+import static org.onosproject.drivers.p4runtime.P4RuntimeDriverUtils.extractP4DeviceId;
+
/**
* Implementation of DeviceDescriptionDiscovery for P4Runtime devices.
*/
public class P4RuntimeDeviceDescriptionDiscovery
- extends AbstractHandlerBehaviour implements DeviceDescriptionDiscovery {
+ extends AbstractP4RuntimeHandlerBehaviour
+ implements DeviceDescriptionDiscovery {
private static final String UNKNOWN = "unknown";
+ private static final String P4_DEVICE_ID = "p4DeviceId";
@Override
public DeviceDescription discoverDeviceDetails() {
+ final Long p4DeviceId = extractP4DeviceId(mgmtUriFromNetcfg());
return new DefaultDeviceDescription(
data().deviceId().uri(),
Device.Type.SWITCH,
@@ -49,6 +53,8 @@
new ChassisId(),
false,
DefaultAnnotations.builder()
+ .set(P4_DEVICE_ID, p4DeviceId == null
+ ? UNKNOWN : String.valueOf(p4DeviceId))
.set(AnnotationKeys.PROTOCOL, "P4Runtime")
.build());
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeDriverUtils.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeDriverUtils.java
new file mode 100644
index 0000000..73eed40
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeDriverUtils.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime;
+
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.pi.model.PiPipelineInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
+
+/**
+ * Utility class for the P4Runtime driver.
+ */
+final class P4RuntimeDriverUtils {
+
+ private static final String DEVICE_ID_PARAM = "device_id=";
+
+ private static final Logger log = LoggerFactory.getLogger(P4RuntimeDriverUtils.class);
+
+ private P4RuntimeDriverUtils() {
+ // Hide constructor.
+ }
+
+ /**
+ * Returns an instance of the interpreter implementation for this device,
+ * null if an interpreter cannot be retrieved.
+ *
+ * @param handler driver handler
+ * @return interpreter or null
+ */
+ static PiPipelineInterpreter getInterpreter(DriverHandler handler) {
+ final DeviceId deviceId = handler.data().deviceId();
+ final Device device = handler.get(DeviceService.class).getDevice(deviceId);
+ if (device == null) {
+ log.warn("Unable to find device {}, cannot get interpreter", deviceId);
+ return null;
+ }
+ if (!device.is(PiPipelineInterpreter.class)) {
+ log.warn("Unable to get interpreter for {}, missing behaviour",
+ deviceId);
+ return null;
+ }
+ return device.as(PiPipelineInterpreter.class);
+ }
+
+ static Long extractP4DeviceId(URI uri) {
+ if (uri == null) {
+ return null;
+ }
+ String[] segments = uri.getRawQuery().split("&");
+ try {
+ for (String s : segments) {
+ if (s.startsWith(DEVICE_ID_PARAM)) {
+ return Long.parseUnsignedLong(
+ URLDecoder.decode(
+ s.substring(DEVICE_ID_PARAM.length()), "utf-8"));
+ }
+ }
+ } catch (UnsupportedEncodingException e) {
+ log.error("Unable to decode P4Runtime-internal device_id from URI {}: {}",
+ uri, e.toString());
+ } catch (NumberFormatException e) {
+ log.error("Invalid P4Runtime-internal device_id in URI {}: {}",
+ uri, e.toString());
+ }
+ log.error("Missing P4Runtime-internal device_id in URI {}", uri);
+ return null;
+ }
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index d55dd71..7432d2d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -58,6 +58,7 @@
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
+import static org.onosproject.drivers.p4runtime.P4RuntimeDriverUtils.getInterpreter;
import static org.onosproject.drivers.p4runtime.P4RuntimeFlowRuleProgrammable.Operation.APPLY;
import static org.onosproject.drivers.p4runtime.P4RuntimeFlowRuleProgrammable.Operation.REMOVE;
import static org.onosproject.net.flow.FlowEntry.FlowEntryState.ADDED;
@@ -175,7 +176,7 @@
log.warn("Found {} inconsistent table entries on {}, removing them...",
inconsistentEntries.size(), deviceId);
// Submit delete request and update mirror when done.
- client.write(pipeconf)
+ client.write(p4DeviceId, pipeconf)
.entities(inconsistentEntries, DELETE)
.submit().whenComplete((response, ex) -> {
if (ex != null) {
@@ -193,7 +194,8 @@
}
private Collection<PiTableEntry> getAllTableEntriesFromDevice() {
- final P4RuntimeReadClient.ReadRequest request = client.read(pipeconf);
+ final P4RuntimeReadClient.ReadRequest request = client.read(
+ p4DeviceId, pipeconf);
// Read entries from all non-constant tables, including default ones.
pipelineModel.tables().stream()
.filter(t -> !t.isConstantTable())
@@ -274,7 +276,7 @@
return Collections.emptyList();
}
// Created batched write request.
- final WriteRequest request = client.write(pipeconf);
+ final WriteRequest request = client.write(p4DeviceId, pipeconf);
// For each rule, translate to PI and append to write request.
final Map<PiHandle, FlowRule> handleToRuleMap = Maps.newHashMap();
final List<FlowRule> skippedRules = Lists.newArrayList();
@@ -433,7 +435,7 @@
}
private PiTableEntry getOriginalDefaultEntry(PiTableId tableId) {
- final PiPipelineInterpreter interpreter = getInterpreter();
+ final PiPipelineInterpreter interpreter = getInterpreter(handler());
if (interpreter == null) {
log.warn("Missing interpreter for {}, cannot get default action",
deviceId);
@@ -483,7 +485,7 @@
.map(id -> PiCounterCellHandle.of(deviceId, id))
.collect(Collectors.toSet());
// FIXME: We might be sending a very large read request...
- return client.read(pipeconf)
+ return client.read(p4DeviceId, pipeconf)
.handles(cellHandles)
.submitSync()
.all(PiCounterCell.class).stream()
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index 64611fe..e689298 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -17,24 +17,27 @@
package org.onosproject.drivers.p4runtime;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.grpc.utils.AbstractGrpcHandshaker;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.device.DeviceHandshaker;
import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.p4runtime.api.P4RuntimeClient;
-import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.api.P4RuntimeController;
import java.math.BigInteger;
import java.util.concurrent.CompletableFuture;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.onosproject.drivers.p4runtime.P4RuntimeDriverUtils.extractP4DeviceId;
/**
* Implementation of DeviceHandshaker for P4Runtime.
*/
-public class P4RuntimeHandshaker extends AbstractP4RuntimeHandlerBehaviour implements DeviceHandshaker {
+public class P4RuntimeHandshaker
+ extends AbstractGrpcHandshaker<P4RuntimeClient, P4RuntimeController>
+ implements DeviceHandshaker {
// This is needed to compute an election ID based on mastership term and
// preference. At the time of writing the practical maximum cluster size is
@@ -44,60 +47,35 @@
// election IDs (e.g. two nodes seeing different cluster sizes).
private static final int MAX_CLUSTER_SIZE = 20;
- @Override
- public CompletableFuture<Boolean> connect() {
- return CompletableFuture
- .supplyAsync(this::createClient);
+ private Long p4DeviceId;
+
+ public P4RuntimeHandshaker() {
+ super(P4RuntimeController.class);
}
- private boolean createClient() {
- final P4RuntimeClientKey clientKey = clientKey();
- if (clientKey == null) {
+ @Override
+ protected boolean setupBehaviour(String opName) {
+ if (!super.setupBehaviour(opName)) {
return false;
}
- if (!handler().get(P4RuntimeController.class).createClient(clientKey)) {
- log.debug("Unable to create client for {}", data().deviceId());
+
+ p4DeviceId = extractP4DeviceId(mgmtUriFromNetcfg());
+ if (p4DeviceId == null) {
+ log.warn("Unable to obtain the P4Runtime-internal device_id from " +
+ "config of {}, cannot perform {}",
+ deviceId, opName);
return false;
}
return true;
}
@Override
- public boolean isConnected() {
- // This is based on the client key obtained from the current netcfg. If
- // a client already exists for this device, but the netcfg with the
- // server endpoints has changed, this will return false.
- return getClientByKey() != null;
- }
-
- @Override
- public void disconnect() {
- // This removes clients associated with this device ID, even if the
- // netcfg has changed and so the client key for this device.
- handler().get(P4RuntimeController.class).removeClient(data().deviceId());
- }
-
- @Override
- public boolean isReachable() {
- final P4RuntimeClient client = getClientByKey();
- return client != null && client.isServerReachable();
- }
-
- @Override
- public CompletableFuture<Boolean> probeReachability() {
- final P4RuntimeClient client = getClientByKey();
- if (client == null) {
- return completedFuture(false);
- }
- return client.probeService();
- }
-
- @Override
public boolean isAvailable() {
// To be available, we require a session open (for packet in/out) and a
// pipeline config set.
- final P4RuntimeClient client = getClientByKey();
- if (client == null || !client.isServerReachable() || !client.isSessionOpen()) {
+ if (!setupBehaviour("isAvailable()") ||
+ !client.isServerReachable() ||
+ !client.isSessionOpen(p4DeviceId)) {
return false;
}
// Since we cannot probe the device, we rely on what's known by the
@@ -111,11 +89,12 @@
public CompletableFuture<Boolean> probeAvailability() {
// To be available, we require a session open (for packet in/out) and a
// pipeline config set.
- final P4RuntimeClient client = getClientByKey();
- if (client == null || !client.isServerReachable() || !client.isSessionOpen()) {
+ if (!setupBehaviour("probeAvailability()") ||
+ !client.isServerReachable() ||
+ !client.isSessionOpen(p4DeviceId)) {
return completedFuture(false);
}
- return client.isAnyPipelineConfigSet();
+ return client.isAnyPipelineConfigSet(p4DeviceId);
}
@Override
@@ -125,7 +104,7 @@
}
if (newRole.equals(MastershipRole.NONE)) {
log.info("Notified role NONE, closing session...");
- client.closeSession();
+ client.closeSession(p4DeviceId);
} else {
throw new UnsupportedOperationException(
"Use preference-based way for setting MASTER or STANDBY roles");
@@ -143,19 +122,21 @@
throw new IllegalStateException(
"Cluster too big! Maz size supported is " + MAX_CLUSTER_SIZE);
}
- BigInteger electionId = BigInteger.valueOf(term)
+ final BigInteger electionId = BigInteger.valueOf(term)
.multiply(BigInteger.valueOf(MAX_CLUSTER_SIZE))
.subtract(BigInteger.valueOf(preference));
- client.setMastership(preference == 0, electionId);
+ client.setMastership(p4DeviceId, preference == 0, electionId);
}
@Override
public MastershipRole getRole() {
- final P4RuntimeClient client = getClientByKey();
- if (client == null || !client.isServerReachable() || !client.isSessionOpen()) {
+ if (!setupBehaviour("getRole()") ||
+ !client.isServerReachable() ||
+ !client.isSessionOpen(p4DeviceId)) {
return MastershipRole.NONE;
}
- return client.isMaster() ? MastershipRole.MASTER : MastershipRole.STANDBY;
+ return client.isMaster(p4DeviceId)
+ ? MastershipRole.MASTER : MastershipRole.STANDBY;
}
@Override
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
index 73faf45..796bea6 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
@@ -105,7 +105,7 @@
final PiMeterCellHandle handle = PiMeterCellHandle.of(deviceId, piMeterCellConfig);
ENTRY_LOCKS.getUnchecked(handle).lock();
- final boolean result = client.write(pipeconf)
+ final boolean result = client.write(p4DeviceId, pipeconf)
.modify(piMeterCellConfig).submitSync().isSuccess();
if (result) {
meterMirror.put(handle, piMeterCellConfig);
@@ -129,7 +129,7 @@
meterIds.add(mode.id());
}
- piMeterCellConfigs = client.read(pipeconf)
+ piMeterCellConfigs = client.read(p4DeviceId, pipeconf)
.meterCells(meterIds).submitSync().all(PiMeterCellConfig.class);
Collection<Meter> meters = piMeterCellConfigs.stream()
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMulticastGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMulticastGroupProgrammable.java
index 80c771e..212ada2 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMulticastGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMulticastGroupProgrammable.java
@@ -145,7 +145,7 @@
return;
case MODIFY:
// Since reading multicast groups is not supported yet on
- // PI/Stratum, we cannot trust groupOnDevic) as we don't have a
+ // PI/Stratum, we cannot trust groupOnDevice as we don't have a
// mechanism to enforce consistency of the mirror with the
// device state.
// if (driverBoolProperty(CHECK_MIRROR_BEFORE_UPDATE,
@@ -169,7 +169,8 @@
private boolean writeMcGroupOnDevice(
PiMulticastGroupEntry group, P4RuntimeClient.UpdateType opType) {
- return client.write(pipeconf).entity(group, opType).submitSync().isSuccess();
+ return client.write(p4DeviceId, pipeconf)
+ .entity(group, opType).submitSync().isSuccess();
}
private boolean mcGroupApply(PiMulticastGroupEntryHandle handle,
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
index 9010c51..8a182a7 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
@@ -23,6 +23,8 @@
import java.util.Collection;
+import static org.onosproject.drivers.p4runtime.P4RuntimeDriverUtils.getInterpreter;
+
/**
* Implementation of PacketProgrammable behaviour for P4Runtime.
*/
@@ -37,7 +39,7 @@
return;
}
- final PiPipelineInterpreter interpreter = getInterpreter();
+ final PiPipelineInterpreter interpreter = getInterpreter(handler());
if (interpreter == null) {
// Error logged by getInterpreter().
return;
@@ -47,7 +49,7 @@
Collection<PiPacketOperation> operations = interpreter.mapOutboundPacket(packet);
operations.forEach(piPacketOperation -> {
log.debug("Doing PiPacketOperation {}", piPacketOperation);
- client.packetOut(piPacketOperation, pipeconf);
+ client.packetOut(p4DeviceId, piPacketOperation, pipeconf);
});
} catch (PiPipelineInterpreter.PiInterpreterException e) {
log.error("Unable to translate outbound packet for {} with pipeconf {}: {}",
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeTableStatisticsDiscovery.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeTableStatisticsDiscovery.java
index 1b390ef..8a32232 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeTableStatisticsDiscovery.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeTableStatisticsDiscovery.java
@@ -36,6 +36,7 @@
import java.util.Map;
import static com.google.common.collect.Lists.newArrayList;
+import static org.onosproject.drivers.p4runtime.P4RuntimeDriverUtils.getInterpreter;
/**
* Implementation of behaviour TableStatisticsDiscovery for P4Runtime.
@@ -49,7 +50,7 @@
return Collections.emptyList();
}
FlowRuleService flowService = handler().get(FlowRuleService.class);
- PiPipelineInterpreter interpreter = getInterpreter();
+ PiPipelineInterpreter interpreter = getInterpreter(handler());
PiPipelineModel model = pipeconf.pipelineModel();
List<TableStatisticsEntry> tableStatsList;
diff --git a/drivers/stratum/BUILD b/drivers/stratum/BUILD
index 1d19c3c..a87c97e 100644
--- a/drivers/stratum/BUILD
+++ b/drivers/stratum/BUILD
@@ -4,6 +4,8 @@
"//drivers/gnmi:onos-drivers-gnmi",
"//drivers/gnoi:onos-drivers-gnoi",
"//pipelines/basic:onos-pipelines-basic",
+ "//protocols/grpc/api:onos-protocols-grpc-api",
+ "//protocols/grpc/utils:onos-protocols-grpc-utils",
]
osgi_jar(
diff --git a/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumDeviceDescriptionDiscovery.java b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumDeviceDescriptionDiscovery.java
index dbcb193..fdf8173 100644
--- a/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumDeviceDescriptionDiscovery.java
+++ b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumDeviceDescriptionDiscovery.java
@@ -63,6 +63,9 @@
// Availability is mandated by P4Runtime.
p4Descr.isDefaultAvailable(),
DefaultAnnotations.builder()
+ .putAll(p4Descr.annotations())
+ .putAll(gnmiDescr.annotations())
+ .putAll(gnoiDescr.annotations())
.set(AnnotationKeys.PROTOCOL, format(
"%s, %s, %s",
p4Descr.annotations().value(AnnotationKeys.PROTOCOL),
diff --git a/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java
index d512d3b..ac9a534 100644
--- a/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java
+++ b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java
@@ -38,6 +38,23 @@
}
@Override
+ public boolean connect() {
+ return p4runtime.connect() && gnmi.connect() && gnoi.connect();
+ }
+
+ @Override
+ public boolean hasConnection() {
+ return p4runtime.hasConnection() && gnmi.hasConnection() && gnoi.hasConnection();
+ }
+
+ @Override
+ public void disconnect() {
+ p4runtime.disconnect();
+ gnmi.disconnect();
+ gnoi.disconnect();
+ }
+
+ @Override
public boolean isReachable() {
// Reachability is mainly used for mastership contests and it's a
// prerequisite for availability. We can probably live without gNMI and
@@ -86,23 +103,4 @@
public void removeDeviceAgentListener(ProviderId providerId) {
p4runtime.removeDeviceAgentListener(providerId);
}
-
- @Override
- public CompletableFuture<Boolean> connect() {
- // We should execute connections in parallel.
- return p4runtime.connect().thenCombine(gnmi.connect(), Boolean::logicalAnd)
- .thenCombine(gnoi.connect(), Boolean::logicalAnd);
- }
-
- @Override
- public boolean isConnected() {
- return p4runtime.isConnected() && gnmi.isConnected() && gnoi.isConnected();
- }
-
- @Override
- public void disconnect() {
- p4runtime.disconnect();
- gnmi.disconnect();
- gnoi.disconnect();
- }
}
diff --git a/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java b/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java
index f7cb811..ff48b57 100644
--- a/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java
+++ b/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java
@@ -52,6 +52,7 @@
*/
public class PortStatisticsDiscoveryImpl extends AbstractHandlerBehaviour implements PortStatisticsDiscovery {
+ private static final long DEFAULT_P4_DEVICE_ID = 1;
private static final Map<Pair<DeviceId, PortNumber>, Long> PORT_START_TIMES =
Maps.newConcurrentMap();
@@ -90,7 +91,7 @@
PiPipeconf pipeconf = piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).get();
P4RuntimeController controller = handler().get(P4RuntimeController.class);
- P4RuntimeClient client = controller.getClient(deviceId);
+ P4RuntimeClient client = controller.get(deviceId);
if (client == null) {
log.warn("Unable to find client for {}, aborting operation", deviceId);
return Collections.emptyList();
@@ -116,7 +117,8 @@
.collect(Collectors.toSet());
// Query the device.
- Collection<PiCounterCell> counterEntryResponse = client.read(pipeconf)
+ Collection<PiCounterCell> counterEntryResponse = client.read(
+ DEFAULT_P4_DEVICE_ID, pipeconf)
.handles(counterCellHandles).submitSync()
.all(PiCounterCell.class);
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java
deleted file mode 100644
index 3a66c18..0000000
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.gnmi.api;
-
-import com.google.common.annotations.Beta;
-import org.onosproject.grpc.api.GrpcClientKey;
-import org.onosproject.net.DeviceId;
-
-import java.net.URI;
-
-/**
- * Key that uniquely identifies a gNMI client.
- */
-@Beta
-public class GnmiClientKey extends GrpcClientKey {
-
- private static final String GNMI = "gNMI";
-
- /**
- * Creates a new gNMI client key.
- *
- * @param deviceId ONOS device ID
- * @param serverUri gNMI server URI
- */
- public GnmiClientKey(DeviceId deviceId, URI serverUri) {
- super(GNMI, deviceId, serverUri);
- }
-}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
index b0e0071..4794570 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
@@ -25,6 +25,6 @@
*/
@Beta
public interface GnmiController
- extends GrpcClientController<GnmiClientKey, GnmiClient>,
+ extends GrpcClientController<GnmiClient>,
ListenerService<GnmiEvent, GnmiEventListener> {
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index 8d93a63..47daa83 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -30,8 +30,8 @@
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiClient;
-import org.onosproject.gnmi.api.GnmiClientKey;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
+import org.onosproject.net.DeviceId;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -51,8 +51,9 @@
private GnmiSubscriptionManager subscribeManager;
- GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
- super(clientKey, managedChannel, false, controller);
+ GnmiClientImpl(DeviceId deviceId, ManagedChannel managedChannel,
+ GnmiControllerImpl controller) {
+ super(deviceId, managedChannel, false, controller);
this.subscribeManager =
new GnmiSubscriptionManager(this, deviceId, controller);
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
index 9c4643a..42d31db 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
@@ -18,11 +18,11 @@
import io.grpc.ManagedChannel;
import org.onosproject.gnmi.api.GnmiClient;
-import org.onosproject.gnmi.api.GnmiClientKey;
import org.onosproject.gnmi.api.GnmiController;
import org.onosproject.gnmi.api.GnmiEvent;
import org.onosproject.gnmi.api.GnmiEventListener;
import org.onosproject.grpc.ctl.AbstractGrpcClientController;
+import org.onosproject.net.DeviceId;
import org.osgi.service.component.annotations.Component;
/**
@@ -31,16 +31,16 @@
@Component(immediate = true, service = GnmiController.class)
public class GnmiControllerImpl
extends AbstractGrpcClientController
- <GnmiClientKey, GnmiClient, GnmiEvent, GnmiEventListener>
+ <GnmiClient, GnmiEvent, GnmiEventListener>
implements GnmiController {
public GnmiControllerImpl() {
- super(GnmiEvent.class);
+ super(GnmiEvent.class, "gNMI");
}
@Override
protected GnmiClient createClientInstance(
- GnmiClientKey clientKey, ManagedChannel channel) {
- return new GnmiClientImpl(clientKey, channel, this);
+ DeviceId deviceId, ManagedChannel channel) {
+ return new GnmiClientImpl(deviceId, channel, this);
}
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
index 73bb807..a33f4b6 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
@@ -77,7 +77,7 @@
if (existingSubscription.equals(request)) {
// Nothing to do. We are already subscribed for the same
// request.
- log.debug("Ignoring re-subscription to same request",
+ log.debug("Ignoring re-subscription to same request for {}",
deviceId);
return;
}
diff --git a/protocols/gnoi/api/src/main/java/org/onosproject/gnoi/api/GnoiClientKey.java b/protocols/gnoi/api/src/main/java/org/onosproject/gnoi/api/GnoiClientKey.java
deleted file mode 100644
index e246d74..0000000
--- a/protocols/gnoi/api/src/main/java/org/onosproject/gnoi/api/GnoiClientKey.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.gnoi.api;
-
-import com.google.common.annotations.Beta;
-import org.onosproject.grpc.api.GrpcClientKey;
-import org.onosproject.net.DeviceId;
-
-import java.net.URI;
-
-/**
- * Key that uniquely identifies a gNOI client.
- */
-@Beta
-public class GnoiClientKey extends GrpcClientKey {
-
- private static final String GNOI = "gNOI";
-
- /**
- * Creates a new gNOI client key.
- *
- * @param deviceId ONOS device ID
- * @param serverUri gNOI server URI
- */
- public GnoiClientKey(DeviceId deviceId, URI serverUri) {
- super(GNOI, deviceId, serverUri);
- }
-}
diff --git a/protocols/gnoi/api/src/main/java/org/onosproject/gnoi/api/GnoiController.java b/protocols/gnoi/api/src/main/java/org/onosproject/gnoi/api/GnoiController.java
index b601e21..811356a 100644
--- a/protocols/gnoi/api/src/main/java/org/onosproject/gnoi/api/GnoiController.java
+++ b/protocols/gnoi/api/src/main/java/org/onosproject/gnoi/api/GnoiController.java
@@ -24,5 +24,5 @@
*/
@Beta
public interface GnoiController
- extends GrpcClientController<GnoiClientKey, GnoiClient> {
+ extends GrpcClientController<GnoiClient> {
}
diff --git a/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiClientImpl.java b/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiClientImpl.java
index f1f1482..84c8364 100644
--- a/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiClientImpl.java
+++ b/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiClientImpl.java
@@ -17,20 +17,21 @@
package org.onosproject.gnoi.ctl;
import gnoi.system.SystemGrpc;
-import gnoi.system.SystemOuterClass.TimeRequest;
-import gnoi.system.SystemOuterClass.TimeResponse;
import gnoi.system.SystemOuterClass.RebootRequest;
import gnoi.system.SystemOuterClass.RebootResponse;
+import gnoi.system.SystemOuterClass.TimeRequest;
+import gnoi.system.SystemOuterClass.TimeResponse;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import org.onosproject.gnoi.api.GnoiClient;
-import org.onosproject.gnoi.api.GnoiClientKey;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Implementation of gNOI client.
@@ -40,8 +41,8 @@
private static final int RPC_TIMEOUT_SECONDS = 10;
private static final Logger log = LoggerFactory.getLogger(GnoiClientImpl.class);
- GnoiClientImpl(GnoiClientKey clientKey, ManagedChannel managedChannel, GnoiControllerImpl controller) {
- super(clientKey, managedChannel, false, controller);
+ GnoiClientImpl(DeviceId deviceId, ManagedChannel managedChannel, GnoiControllerImpl controller) {
+ super(deviceId, managedChannel, false, controller);
}
@Override
@@ -123,19 +124,4 @@
SystemGrpc.newStub(channel)
.withDeadlineAfter(RPC_TIMEOUT_SECONDS, TimeUnit.SECONDS)));
}
-
- /**
- * Forces execution of an RPC in a cancellable context with no timeout.
- *
- * @param stubConsumer SystemStub stub consumer
- */
- void execRpcNoTimeout(Consumer<SystemGrpc.SystemStub> stubConsumer) {
- if (log.isTraceEnabled()) {
- log.trace("Executing RPC with no timeout (context deadline {})...",
- context().getDeadline());
- }
- runInCancellableContext(() -> stubConsumer.accept(
- SystemGrpc.newStub(channel)));
- }
-
}
diff --git a/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiControllerImpl.java b/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiControllerImpl.java
index 09ef01d..445d521 100644
--- a/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiControllerImpl.java
+++ b/protocols/gnoi/ctl/src/main/java/org/onosproject/gnoi/ctl/GnoiControllerImpl.java
@@ -19,9 +19,9 @@
import org.onosproject.event.AbstractEvent;
import org.onosproject.event.EventListener;
import org.onosproject.gnoi.api.GnoiClient;
-import org.onosproject.gnoi.api.GnoiClientKey;
import org.onosproject.gnoi.api.GnoiController;
import org.onosproject.grpc.ctl.AbstractGrpcClientController;
+import org.onosproject.net.DeviceId;
import org.osgi.service.component.annotations.Component;
/**
@@ -29,15 +29,15 @@
*/
@Component(immediate = true, service = GnoiController.class)
public class GnoiControllerImpl
- extends AbstractGrpcClientController<GnoiClientKey, GnoiClient, AbstractEvent, EventListener<AbstractEvent>>
+ extends AbstractGrpcClientController<GnoiClient, AbstractEvent, EventListener<AbstractEvent>>
implements GnoiController {
public GnoiControllerImpl() {
- super(AbstractEvent.class);
+ super(AbstractEvent.class, "gNOI");
}
@Override
- protected GnoiClient createClientInstance(GnoiClientKey clientKey, ManagedChannel channel) {
- return new GnoiClientImpl(clientKey, channel, this);
+ protected GnoiClient createClientInstance(DeviceId deviceId, ManagedChannel channel) {
+ return new GnoiClientImpl(deviceId, channel, this);
}
-}
\ No newline at end of file
+}
diff --git a/protocols/grpc/BUILD b/protocols/grpc/BUILD
index f82e1f0..3dd1084 100644
--- a/protocols/grpc/BUILD
+++ b/protocols/grpc/BUILD
@@ -3,9 +3,9 @@
load("//tools/build/bazel:variables.bzl", "PROTOBUF_VERSION")
BUNDLES = [
- "//protocols/grpc/proto:onos-protocols-grpc-proto",
"//protocols/grpc/api:onos-protocols-grpc-api",
"//protocols/grpc/ctl:onos-protocols-grpc-ctl",
+ "//protocols/grpc/utils:onos-protocols-grpc-utils",
# gRPC dependencies (with patched core)
":grpc-core",
":grpc-stub",
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
index ad4324b..42a9248 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
@@ -19,75 +19,84 @@
import com.google.common.annotations.Beta;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-import io.grpc.StatusRuntimeException;
-import java.util.Map;
+import java.net.URI;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
/**
- * Abstraction of a gRPC controller that stores and manages gRPC channels.
+ * Abstraction of a gRPC controller that creates, stores, and manages gRPC
+ * channels.
*/
@Beta
public interface GrpcChannelController {
- int CONNECTION_TIMEOUT_SECONDS = 20;
+ /**
+ * Creates a gRPC managed channel to the server identified by the given
+ * channel URI. The channel is created using the information contained in the
+ * URI, as such, the URI is expected to have absolute server-based form,
+ * where the scheme can be either {@code grpc:} or {@code grpcs:}, to
+ * indicated respectively a plaintext or secure channel.
+ * <p>
+ * Example of valid URIs are: <pre> {@code
+ * grpc://10.0.0.1:50001
+ * grpcs://10.0.0.1:50001
+ * grpcs://myserver.local:50001
+ * }</pre>
+ * <p>
+ * This method creates and stores the channel instance associating it to the
+ * passed URI, but it does not make any attempt to connect the channel or
+ * verify server reachability.
+ * <p>
+ * If another channel with the same URI already exists, an {@link
+ * IllegalArgumentException} is thrown. To create multiple channels to the
+ * same server-port combination, URI file or query parameters can be used.
+ * For example: <pre> {@code
+ * grpc://10.0.0.1:50001/foo
+ * grpc://10.0.0.1:50001/bar
+ * grpc://10.0.0.1:50001/bar?param=1
+ * grpc://10.0.0.1:50001/bar?param=2
+ * }</pre>
+ * <p>
+ * When creating secure channels (i.e., {@code grpcs:)}, the current
+ * implementation provides encryption but not authentication, any server
+ * certificate, even if insecure, will be accepted.
+ *
+ * @param channelUri channel URI
+ * @return the managed channel created
+ * @throws IllegalArgumentException if a channel with the same channel URI
+ * already exists
+ */
+ ManagedChannel create(URI channelUri);
/**
- * Creates a gRPC managed channel from the given builder and opens the
- * connection. If the connection is successful, returns the managed channel
- * object and stores the channel internally, associated with the given
- * channel ID.
- * <p>
- * This method blocks until the channel is open or a timeout expires. By
- * default the timeout is {@link #CONNECTION_TIMEOUT_SECONDS} seconds. If
- * the timeout expires, a {@link StatusRuntimeException} is thrown. If
- * another channel with the same ID already exists, an {@link
- * IllegalArgumentException} is thrown.
+ * Similar to {@link #create(URI)} but does not create the chanel instance,
+ * instead, it uses the given channel builder to create it. As such, there
+ * is no requirement on the format of the URI, any URI can be used. The
+ * implementation might modify the passed builder for purposes specific to
+ * this controller, such as to enable gRPC message logging.
*
- * @param channelId ID of the channel
+ * @param channelUri URI identifying the channel
* @param channelBuilder builder of the managed channel
* @return the managed channel created
- * @throws StatusRuntimeException if the channel cannot be opened
* @throws IllegalArgumentException if a channel with the same ID already
* exists
*/
- ManagedChannel connectChannel(GrpcChannelId channelId,
- ManagedChannelBuilder<?> channelBuilder);
+ ManagedChannel create(URI channelUri,
+ ManagedChannelBuilder<?> channelBuilder);
/**
- * Closes the gRPC managed channel (i.e., disconnects from the gRPC server)
- * and removes any internal state associated to it.
+ * Closes and destroys the gRPC channel associated to the given URI and
+ * removes any internal state associated to it.
*
- * @param channelId ID of the channel to remove
+ * @param channelUri URI of the channel to remove
*/
- void disconnectChannel(GrpcChannelId channelId);
+ void destroy(URI channelUri);
/**
- * Returns all channels known by this controller, each one mapped to the ID
- * passed at creation time.
+ * If present, returns the channel associated with the given URI.
*
- * @return map of all the channels with their ID as stored in this
- * controller
- */
- Map<GrpcChannelId, ManagedChannel> getChannels();
-
- /**
- * If present, returns the channel associated with the given ID.
- *
- * @param channelId channel ID
+ * @param channelUri channel URI
* @return optional channel
*/
- Optional<ManagedChannel> getChannel(GrpcChannelId channelId);
-
- /**
- * Probes the server at the endpoint of the given channel. Returns true if
- * the server responded to the probe, false otherwise or if the channel does
- * not exist.
- *
- * @param channelId channel ID
- * @return completable future eventually true if the gRPC server responded
- * to the probe; false otherwise
- */
- CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId);
+ Optional<ManagedChannel> get(URI channelUri);
}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java
deleted file mode 100644
index 6db331a..0000000
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.grpc.api;
-
-import com.google.common.annotations.Beta;
-import org.onlab.util.Identifier;
-
-/**
- * gRPC channel identifier, unique in the scope of an ONOS node.
- */
-@Beta
-public final class GrpcChannelId extends Identifier<String> {
-
- private GrpcChannelId(String channelName) {
- super(channelName);
- }
-
- /**
- * Instantiates a new channel ID.
- *
- * @param channelName name of the channel
- * @return channel ID
- */
- public static GrpcChannelId of(String channelName) {
- return new GrpcChannelId(channelName);
- }
-}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
index b47d499..3edb36d 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
@@ -17,6 +17,7 @@
package org.onosproject.grpc.api;
import com.google.common.annotations.Beta;
+import io.grpc.ManagedChannel;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.provider.ProviderId;
@@ -24,76 +25,54 @@
/**
* Abstraction of controller that manages gRPC clients.
*
- * @param <K> the gRPC client key
* @param <C> the gRPC client type
*/
@Beta
-public interface GrpcClientController<K extends GrpcClientKey, C extends GrpcClient> {
+public interface GrpcClientController<C extends GrpcClient> {
/**
- * Instantiates a new client to operate on a gRPC server identified by the
- * given information. As a result of this method, a client can be later
- * obtained by invoking {@link #getClient(DeviceId)}.
+ * Instantiates a new client to operate on the given gRPC channel. Returns
+ * true if the client was created successfully, false otherwise. Clients
+ * are identified by device IDs and once created they can be obtained by
+ * invoking {@link #get(DeviceId)}.
* <p>
- * Upon creation, a connection to the server is automatically started, which
- * blocks execution. If the connection is successful, the client is created
- * and this method returns true, otherwise (e.g., socket error) any state
- * associated with this client is destroyed and returns false.
- * <p>
- * Only one client can exist for the same device ID. Calls to this method
- * are idempotent fot the same client key, i.e. returns true if such client
- * already exists. Otherwise, if a client for the same device ID but
- * different client key already exists, throws an exception.
+ * Only one client can exist for the same device ID. If a client for the
+ * given device ID already exists, throws an exception.
*
- * @param clientKey the client key
- * @return true if the client was created and the channel to the server is
- * open; false otherwise
- * @throws IllegalArgumentException if a client for the same device ID but
- * different client key already exists.
+ * @param deviceId device ID
+ * @param channel gRPC managed channel
+ * @return true if the client was created, false otherwise
+ * @throws IllegalArgumentException if a client for the same device ID
+ * already exists.
*/
- boolean createClient(K clientKey);
+ boolean create(DeviceId deviceId, ManagedChannel channel);
/**
- * Returns the gRPC client previously created for the given device, or null
- * if such client does not exist.
- *
- * @param deviceId the device identifier
- * @return the gRPC client of the device if exists; null otherwise
- */
- C getClient(DeviceId deviceId);
-
- /**
- * Returns the gRPC client previously created for the given client key, or
+ * Returns the gRPC client previously created for the given device ID, or
* null if such client does not exist.
*
- * @param clientKey client key
+ * @param deviceId the device ID
* @return the gRPC client of the device if exists; null otherwise
*/
- C getClient(K clientKey);
+ C get(DeviceId deviceId);
/**
* Removes the gRPC client for the given device and any gRPC channel state
* associated to it. If no client exists for the given device, the result is
* a no-op.
*
- * @param deviceId the device identifier
+ * @param deviceId the device ID
*/
- void removeClient(DeviceId deviceId);
+ void remove(DeviceId deviceId);
/**
- * Similar to {@link #removeClient(DeviceId)} but uses the client key to
- * identify the client to remove.
+ * Adds a listener for device agent events for the given provider. If a
+ * listener already exists for the given device ID and provider ID, then it
+ * will be replaced by the new one.
*
- * @param clientKey the client key
- */
- void removeClient(K clientKey);
-
- /**
- * Adds a listener for device agent events for the given provider.
- *
- * @param deviceId device identifier
+ * @param deviceId device ID
* @param providerId provider ID
- * @param listener the device agent listener
+ * @param listener the device agent listener
*/
void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId,
DeviceAgentListener listener);
@@ -102,7 +81,7 @@
* Removes the listener for device agent events that was previously
* registered for the given provider.
*
- * @param deviceId device identifier
+ * @param deviceId device ID
* @param providerId the provider ID
*/
void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId);
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
deleted file mode 100644
index d1d0b0f..0000000
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.grpc.api;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Objects;
-import org.onosproject.net.DeviceId;
-
-import java.net.URI;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.lang.String.format;
-
-/**
- * Key that uniquely identifies a gRPC client.
- */
-@Beta
-public class GrpcClientKey {
-
- private static final String GRPC = "grpc";
- private static final String GRPCS = "grpcs";
-
- private final String serviceName;
- private final DeviceId deviceId;
- private final URI serverUri;
-
- /**
- * Creates a new client key.
- *
- * @param serviceName gRPC service name of the client
- * @param deviceId ONOS device ID
- * @param serverUri gRPC server URI
- */
- public GrpcClientKey(String serviceName, DeviceId deviceId, URI serverUri) {
- checkNotNull(serviceName);
- checkNotNull(deviceId);
- checkNotNull(serverUri);
- checkArgument(!serviceName.isEmpty(),
- "Service name can not be null");
- checkArgument(serverUri.getScheme().equals(GRPC)
- || serverUri.getScheme().equals(GRPCS),
- format("Server URI scheme must be %s or %s", GRPC, GRPCS));
- checkArgument(!isNullOrEmpty(serverUri.getHost()),
- "Server host address should not be empty");
- checkArgument(serverUri.getPort() > 0 && serverUri.getPort() <= 65535, "Invalid server port");
- this.serviceName = serviceName;
- this.deviceId = deviceId;
- this.serverUri = serverUri;
- }
-
- /**
- * Gets the gRPC service name of the client.
- *
- * @return the service name
- */
- public String serviceName() {
- return serviceName;
- }
-
- /**
- * Gets the device ID.
- *
- * @return the device ID
- */
- public DeviceId deviceId() {
- return deviceId;
- }
-
- /**
- * Returns the gRPC server URI.
- *
- * @return the gRPC server URI.
- */
- public URI serveUri() {
- return serverUri;
- }
-
- /**
- * Returns true if the client requires TLS/SSL, false otherwise.
- *
- * @return boolean
- */
- public boolean requiresSecureChannel() {
- return serverUri.getScheme().equals(GRPCS);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- GrpcClientKey that = (GrpcClientKey) o;
- return Objects.equal(serviceName, that.serviceName) &&
- Objects.equal(deviceId, that.deviceId) &&
- Objects.equal(serverUri, that.serverUri);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(serviceName, deviceId, serverUri);
- }
-
- @Override
- public String toString() {
- return format("%s/%s@%s", deviceId, serviceName, serverUri);
- }
-}
diff --git a/protocols/grpc/ctl/BUILD b/protocols/grpc/ctl/BUILD
index a467398..fa10ff3 100644
--- a/protocols/grpc/ctl/BUILD
+++ b/protocols/grpc/ctl/BUILD
@@ -1,6 +1,5 @@
COMPILE_DEPS = CORE_DEPS + [
"//protocols/grpc/api:onos-protocols-grpc-api",
- "//protocols/grpc/proto:onos-protocols-grpc-proto",
"@io_grpc_grpc_java//core",
"@io_grpc_grpc_java//netty",
"@io_grpc_grpc_java//protobuf-lite",
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index befa334..933c7ce 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -21,7 +21,6 @@
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import org.onosproject.grpc.api.GrpcClient;
-import org.onosproject.grpc.api.GrpcClientKey;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceAgentEvent;
import org.slf4j.Logger;
@@ -49,25 +48,26 @@
private final AtomicBoolean channelOpen = new AtomicBoolean(false);
/**
- * Creates an new client for the given key and channel. Setting persistent
- * to true avoids the gRPC channel to stay IDLE. The controller instance is
- * needed to propagate channel events.
+ * Creates an new client for the given device and channel. Setting
+ * persistent to true avoids the gRPC channel to go {@link
+ * ConnectivityState#IDLE}. The controller instance is needed to propagate
+ * channel events.
*
- * @param clientKey client key
+ * @param deviceId device ID
* @param channel channel
* @param persistent true if the gRPC should never stay IDLE
* @param controller controller
*/
- protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel,
+ protected AbstractGrpcClient(DeviceId deviceId, ManagedChannel channel,
boolean persistent, AbstractGrpcClientController controller) {
- checkNotNull(clientKey);
+ checkNotNull(deviceId);
checkNotNull(channel);
- this.deviceId = clientKey.deviceId();
+ this.deviceId = deviceId;
this.channel = channel;
this.persistent = persistent;
this.controller = controller;
- setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
+ setChannelCallback(ConnectivityState.CONNECTING);
}
@Override
@@ -94,7 +94,7 @@
"ignoring request to shutdown for {}...", deviceId);
return;
}
- log.warn("Shutting down client for {}...", deviceId);
+ log.debug("Shutting down client for {}...", deviceId);
cancellableContext.cancel(new InterruptedException(
"Requested client shutdown"));
}
@@ -140,84 +140,70 @@
opDescription, deviceId), throwable);
}
- private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
- ConnectivityState sourceState) {
+ private void setChannelCallback(ConnectivityState sourceState) {
if (log.isTraceEnabled()) {
log.trace("Setting channel callback for {} with source state {}...",
deviceId, sourceState);
}
channel.notifyWhenStateChanged(
- sourceState, new ChannelConnectivityCallback(deviceId, channel));
+ sourceState, this::channelStateCallback);
}
/**
- * Runnable task invoked at each change of the channel connectivity state.
- * New callbacks are created as long as the channel is not shut down.
+ * Invoked at each change of the channel connectivity state. New callbacks
+ * are created as long as the channel is not shut down.
*/
- private final class ChannelConnectivityCallback implements Runnable {
-
- private final DeviceId deviceId;
- private final ManagedChannel channel;
-
- private ChannelConnectivityCallback(
- DeviceId deviceId, ManagedChannel channel) {
- this.deviceId = deviceId;
- this.channel = channel;
+ private void channelStateCallback() {
+ final ConnectivityState newState = channel.getState(false);
+ final DeviceAgentEvent.Type eventType;
+ switch (newState) {
+ // On gRPC connectivity states:
+ // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
+ case READY:
+ eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+ break;
+ case TRANSIENT_FAILURE:
+ eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
+ break;
+ case SHUTDOWN:
+ eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+ break;
+ case IDLE:
+ // IDLE and CONNECTING are transient states that will
+ // eventually move to READY or TRANSIENT_FAILURE. Do not
+ // generate an event for now.
+ if (persistent) {
+ log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
+ channel.getState(true);
+ }
+ eventType = null;
+ break;
+ case CONNECTING:
+ eventType = null;
+ break;
+ default:
+ log.error("Unrecognized connectivity state {}", newState);
+ eventType = null;
}
- @Override
- public void run() {
- final ConnectivityState newState = channel.getState(false);
- final DeviceAgentEvent.Type eventType;
- switch (newState) {
- // On gRPC connectivity states:
- // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
- case READY:
- eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
- break;
- case TRANSIENT_FAILURE:
- eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
- break;
- case SHUTDOWN:
- eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
- break;
- case IDLE:
- // IDLE and CONNECTING are transient states that will
- // eventually move to READY or TRANSIENT_FAILURE. Do not
- // generate an event for now.
- if (persistent) {
- log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
- channel.getState(true);
- }
- eventType = null;
- break;
- case CONNECTING:
- eventType = null;
- break;
- default:
- log.error("Unrecognized connectivity state {}", newState);
- eventType = null;
- }
+ if (log.isTraceEnabled()) {
+ log.trace("Detected channel connectivity change for {}, new state is {}",
+ deviceId, newState);
+ }
- if (log.isTraceEnabled()) {
- log.trace("Detected channel connectivity change for {}, new state is {}",
- deviceId, newState);
+ if (eventType != null) {
+ // Avoid sending consecutive duplicate events.
+ final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
+ final boolean past = channelOpen.getAndSet(present);
+ if (present != past) {
+ log.debug("Notifying event {} for {}", eventType, deviceId);
+ controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
}
+ }
- if (eventType != null) {
- // Avoid sending consecutive duplicate events.
- final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
- final boolean past = channelOpen.getAndSet(present);
- if (present != past) {
- log.debug("Notifying event {} for {}", eventType, deviceId);
- controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
- }
- }
-
- if (newState != ConnectivityState.SHUTDOWN) {
- // Channels never leave SHUTDOWN state, no need for a new callback.
- setChannelCallback(deviceId, channel, newState);
- }
+ if (newState != ConnectivityState.SHUTDOWN) {
+ // Channels never leave SHUTDOWN state, no need for a new callback.
+ setChannelCallback(newState);
}
}
}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
index 3c38209..9fb3625 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -19,29 +19,19 @@
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Striped;
import io.grpc.ManagedChannel;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NettyChannelBuilder;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.event.Event;
import org.onosproject.event.EventListener;
-import org.onosproject.grpc.api.GrpcChannelController;
-import org.onosproject.grpc.api.GrpcChannelId;
import org.onosproject.grpc.api.GrpcClient;
import org.onosproject.grpc.api.GrpcClientController;
-import org.onosproject.grpc.api.GrpcClientKey;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceAgentEvent;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.provider.ProviderId;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import javax.net.ssl.SSLException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
@@ -52,40 +42,35 @@
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Abstract class of a gRPC based client controller for specific gRPC client
- * which provides basic gRPC client management and thread safe mechanism.
+ * Abstract class of a controller for gRPC clients which provides means to
+ * create clients, associate device agent listeners to them and register other
+ * event listeners.
*
* @param <C> the gRPC client type
- * @param <K> the key type of the gRPC client
* @param <E> the event type of the gRPC client
* @param <L> the event listener of event {@link E}
*/
public abstract class AbstractGrpcClientController
- <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
+ <C extends GrpcClient, E extends Event, L extends EventListener<E>>
extends AbstractListenerManager<E, L>
- implements GrpcClientController<K, C> {
+ implements GrpcClientController<C> {
/**
* The default max inbound message size (MB).
*/
- private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
- private static final int MEGABYTES = 1024 * 1024;
private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
private final Logger log = getLogger(getClass());
- private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
- private final Map<K, C> clients = Maps.newHashMap();
- private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+ private final Map<DeviceId, C> clients = Maps.newHashMap();
private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
deviceAgentListeners = Maps.newConcurrentMap();
private final Class<E> eventClass;
+ private final String serviceName;
private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected GrpcChannelController grpcChannelController;
-
- public AbstractGrpcClientController(Class<E> eventClass) {
+ public AbstractGrpcClientController(Class<E> eventClass, String serviceName) {
this.eventClass = eventClass;
+ this.serviceName = serviceName;
}
@Activate
@@ -97,162 +82,67 @@
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(eventClass);
- clientKeys.keySet().forEach(this::removeClient);
- clientKeys.clear();
clients.clear();
- channelIds.clear();
deviceAgentListeners.clear();
log.info("Stopped");
}
@Override
- public boolean createClient(K clientKey) {
- checkNotNull(clientKey);
- return withDeviceLock(() -> doCreateClient(clientKey),
- clientKey.deviceId());
+ public boolean create(DeviceId deviceId, ManagedChannel channel) {
+ checkNotNull(deviceId);
+ checkNotNull(channel);
+ return withDeviceLock(() -> doCreateClient(deviceId, channel), deviceId);
}
- private boolean doCreateClient(K clientKey) {
- DeviceId deviceId = clientKey.deviceId();
+ private boolean doCreateClient(DeviceId deviceId, ManagedChannel channel) {
- if (clientKeys.containsKey(deviceId)) {
- final GrpcClientKey existingKey = clientKeys.get(deviceId);
- if (clientKey.equals(existingKey)) {
- log.debug("Not creating {} as it already exists... (key={})",
- clientName(clientKey), clientKey);
- return true;
- } else {
- throw new IllegalArgumentException(format(
- "A client already exists for device %s (%s)",
- deviceId, clientKey));
- }
- }
-
- final String method = clientKey.requiresSecureChannel()
- ? "TLS" : "plaintext TCP";
-
- log.info("Connecting {} client for {} to server at {} using {}...",
- clientKey.serviceName(), deviceId, clientKey.serveUri(), method);
-
- SslContext sslContext = null;
- if (clientKey.requiresSecureChannel()) {
- try {
- // FIXME: Accept any server certificate; this is insecure and
- // should not be used in production
- sslContext = GrpcSslContexts.forClient().trustManager(
- InsecureTrustManagerFactory.INSTANCE).build();
- } catch (SSLException e) {
- log.error("Failed to build SSL Context", e);
- return false;
- }
- }
-
- GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString());
- NettyChannelBuilder channelBuilder = NettyChannelBuilder
- .forAddress(clientKey.serveUri().getHost(),
- clientKey.serveUri().getPort())
- .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
-
- if (sslContext != null) {
- channelBuilder
- .sslContext(sslContext)
- .useTransportSecurity();
- } else {
- channelBuilder.usePlaintext();
- }
-
- final ManagedChannel channel;
-
- try {
- channel = grpcChannelController.connectChannel(channelId, channelBuilder);
- } catch (Throwable e) {
- log.warn("Failed to connect to {} ({}) using {}: {}",
- deviceId, clientKey.serveUri(), method, e.toString());
- log.debug("gRPC client connection exception", e);
- return false;
+ if (clients.containsKey(deviceId)) {
+ throw new IllegalArgumentException(format(
+ "A %s client already exists for %s", serviceName, deviceId));
}
final C client;
try {
- client = createClientInstance(clientKey, channel);
+ client = createClientInstance(deviceId, channel);
} catch (Throwable e) {
- log.error("Exception while creating {}", clientName(clientKey), e);
- grpcChannelController.disconnectChannel(channelId);
+ log.error("Exception while creating {}", clientName(deviceId), e);
return false;
}
if (client == null) {
- log.error("Unable to create {}, implementation returned null... (key={})",
- clientName(clientKey), clientKey);
- grpcChannelController.disconnectChannel(channelId);
+ log.error("Unable to create {}, implementation returned null...",
+ clientName(deviceId));
return false;
}
- clientKeys.put(deviceId, clientKey);
- clients.put(clientKey, client);
- channelIds.put(deviceId, channelId);
-
+ clients.put(deviceId, client);
return true;
}
- protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
+ protected abstract C createClientInstance(DeviceId deviceId, ManagedChannel channel);
@Override
- public C getClient(DeviceId deviceId) {
+ public C get(DeviceId deviceId) {
checkNotNull(deviceId);
- return withDeviceLock(() -> doGetClient(deviceId), deviceId);
+ return withDeviceLock(() -> clients.get(deviceId), deviceId);
}
- private C doGetClient(DeviceId deviceId) {
- if (!clientKeys.containsKey(deviceId)) {
+ @Override
+ public void remove(DeviceId deviceId) {
+ checkNotNull(deviceId);
+ withDeviceLock(() -> {
+ final C client = clients.remove(deviceId);
+ if (client != null) {
+ client.shutdown();
+ }
return null;
- }
- return clients.get(clientKeys.get(deviceId));
- }
-
- @Override
- public C getClient(K clientKey) {
- checkNotNull(clientKey);
- return clients.get(clientKey);
- }
-
- @Override
- public void removeClient(DeviceId deviceId) {
- checkNotNull(deviceId);
- withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
- }
-
- @Override
- public void removeClient(K clientKey) {
- checkNotNull(clientKey);
- withDeviceLock(() -> doRemoveClient(clientKey), clientKey.deviceId());
- }
-
- private Void doRemoveClient(DeviceId deviceId) {
- if (clientKeys.containsKey(deviceId)) {
- doRemoveClient(clientKeys.get(deviceId));
- }
- return null;
- }
-
- private Void doRemoveClient(K clientKey) {
- if (clients.containsKey(clientKey)) {
- clients.get(clientKey).shutdown();
- }
- if (channelIds.containsKey(clientKey.deviceId())) {
- grpcChannelController.disconnectChannel(
- channelIds.get(clientKey.deviceId()));
- }
- clientKeys.remove(clientKey.deviceId());
- clients.remove(clientKey);
- channelIds.remove(clientKey.deviceId());
- return null;
+ }, deviceId);
}
@Override
public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
checkNotNull(deviceId, "deviceId cannot be null");
- checkNotNull(deviceId, "providerId cannot be null");
+ checkNotNull(providerId, "providerId cannot be null");
checkNotNull(listener, "listener cannot be null");
deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
deviceAgentListeners.get(deviceId).put(providerId, listener);
@@ -277,7 +167,7 @@
// We should have only one event delivery mechanism. We have two now
// because we have two different types of events, DeviceAgentEvent and
// controller/protocol specific ones (e.g. P4Runtime or gNMI).
- // TODO: extend device agent event to allow delivery protocol-specific
+ // TODO: extend device agent event to allow delivery of protocol-specific
// events, e.g. packet-in
checkNotNull(event);
if (deviceAgentListeners.containsKey(event.subject())) {
@@ -296,7 +186,7 @@
}
}
- private String clientName(GrpcClientKey key) {
- return format("%s client for %s", key.serviceName(), key.deviceId());
+ private String clientName(DeviceId deviceId) {
+ return format("%s client for %s", serviceName, deviceId);
}
}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index 96a1671..3be7706 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -16,18 +16,16 @@
package org.onosproject.grpc.ctl;
-import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Striped;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.grpc.api.GrpcChannelController;
-import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.proto.dummy.Dummy;
-import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -38,16 +36,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLException;
+import java.net.URI;
import java.util.Dictionary;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG;
import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_DEFAULT;
@@ -61,6 +62,12 @@
})
public class GrpcChannelControllerImpl implements GrpcChannelController {
+ private static final String GRPC = "grpc";
+ private static final String GRPCS = "grpcs";
+
+ private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
+ private static final int MEGABYTES = 1024 * 1024;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService componentConfigService;
@@ -72,8 +79,8 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private Map<GrpcChannelId, ManagedChannel> channels;
- private Map<GrpcChannelId, GrpcLoggingInterceptor> interceptors;
+ private Map<URI, ManagedChannel> channels;
+ private Map<URI, GrpcLoggingInterceptor> interceptors;
private final Striped<Lock> channelLocks = Striped.lock(30);
@@ -109,129 +116,109 @@
}
@Override
- public ManagedChannel connectChannel(GrpcChannelId channelId,
- ManagedChannelBuilder<?> channelBuilder) {
- checkNotNull(channelId);
- checkNotNull(channelBuilder);
-
- Lock lock = channelLocks.get(channelId);
- lock.lock();
-
- try {
- if (channels.containsKey(channelId)) {
- throw new IllegalArgumentException(format(
- "A channel with ID '%s' already exists", channelId));
- }
-
- final GrpcLoggingInterceptor interceptor = new GrpcLoggingInterceptor(
- channelId, enableMessageLog);
- channelBuilder.intercept(interceptor);
-
- ManagedChannel channel = channelBuilder.build();
- // Forced connection API is still experimental. Use workaround...
- // channel.getState(true);
- try {
- doDummyMessage(channel);
- } catch (StatusRuntimeException e) {
- interceptor.close();
- shutdownNowAndWait(channel, channelId);
- throw e;
- }
- // If here, channel is open.
- channels.put(channelId, channel);
- interceptors.put(channelId, interceptor);
- return channel;
- } finally {
- lock.unlock();
- }
- }
-
- private void doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
- DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
- .newBlockingStub(channel)
- .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- try {
- //noinspection ResultOfMethodCallIgnored
- dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
- .getDefaultInstance());
- } catch (StatusRuntimeException e) {
- if (!e.getStatus().equals(Status.UNIMPLEMENTED)) {
- // UNIMPLEMENTED means that the server received our message but
- // doesn't know how to handle it. Hence, channel is open.
- throw e;
- }
- }
+ public ManagedChannel create(URI channelUri) {
+ return create(channelUri, makeChannelBuilder(channelUri));
}
@Override
- public void disconnectChannel(GrpcChannelId channelId) {
- checkNotNull(channelId);
+ public ManagedChannel create(URI channelUri, ManagedChannelBuilder<?> channelBuilder) {
+ checkNotNull(channelUri);
+ checkNotNull(channelBuilder);
- Lock lock = channelLocks.get(channelId);
- lock.lock();
+ channelLocks.get(channelUri).lock();
try {
- final ManagedChannel channel = channels.remove(channelId);
- if (channel != null) {
- shutdownNowAndWait(channel, channelId);
+ if (channels.containsKey(channelUri)) {
+ throw new IllegalArgumentException(format(
+ "A channel with ID '%s' already exists", channelUri));
}
- final GrpcLoggingInterceptor interceptor = interceptors.remove(channelId);
+
+ log.info("Creating new gRPC channel {}...", channelUri);
+
+ final GrpcLoggingInterceptor interceptor = new GrpcLoggingInterceptor(
+ channelUri, enableMessageLog);
+ channelBuilder.intercept(interceptor);
+
+ final ManagedChannel channel = channelBuilder.build();
+
+ channels.put(channelUri, channelBuilder.build());
+ interceptors.put(channelUri, interceptor);
+
+ return channel;
+ } finally {
+ channelLocks.get(channelUri).unlock();
+ }
+ }
+
+ private NettyChannelBuilder makeChannelBuilder(URI channelUri) {
+
+ checkArgument(channelUri.getScheme().equals(GRPC)
+ || channelUri.getScheme().equals(GRPCS),
+ format("Server URI scheme must be %s or %s", GRPC, GRPCS));
+ checkArgument(!isNullOrEmpty(channelUri.getHost()),
+ "Server host address should not be empty");
+ checkArgument(channelUri.getPort() > 0 && channelUri.getPort() <= 65535,
+ "Invalid server port");
+
+ final boolean useTls = channelUri.getScheme().equals(GRPCS);
+
+ final NettyChannelBuilder channelBuilder = NettyChannelBuilder
+ .forAddress(channelUri.getHost(),
+ channelUri.getPort())
+ .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
+
+ if (useTls) {
+ try {
+ // Accept any server certificate; this is insecure and
+ // should not be used in production.
+ final SslContext sslContext = GrpcSslContexts.forClient().trustManager(
+ InsecureTrustManagerFactory.INSTANCE).build();
+ channelBuilder.sslContext(sslContext).useTransportSecurity();
+ } catch (SSLException e) {
+ log.error("Failed to build SSL context", e);
+ return null;
+ }
+ } else {
+ channelBuilder.usePlaintext();
+ }
+
+ return channelBuilder;
+ }
+
+ @Override
+ public void destroy(URI channelUri) {
+ checkNotNull(channelUri);
+
+ channelLocks.get(channelUri).lock();
+ try {
+ final ManagedChannel channel = channels.remove(channelUri);
+ if (channel != null) {
+ shutdownNowAndWait(channel, channelUri);
+ }
+ final GrpcLoggingInterceptor interceptor = interceptors.remove(channelUri);
if (interceptor != null) {
interceptor.close();
}
} finally {
- lock.unlock();
+ channelLocks.get(channelUri).unlock();
}
}
- private void shutdownNowAndWait(ManagedChannel channel, GrpcChannelId channelId) {
+ private void shutdownNowAndWait(ManagedChannel channel, URI channelUri) {
try {
if (!channel.shutdownNow()
.awaitTermination(5, TimeUnit.SECONDS)) {
- log.error("Channel '{}' didn't terminate, although we " +
- "triggered a shutdown and waited",
- channelId);
+ log.error("Channel {} did not terminate properly",
+ channelUri);
}
} catch (InterruptedException e) {
- log.warn("Channel {} didn't shutdown in time", channelId);
+ log.warn("Channel {} didn't shutdown in time", channelUri);
Thread.currentThread().interrupt();
}
}
@Override
- public Map<GrpcChannelId, ManagedChannel> getChannels() {
- return ImmutableMap.copyOf(channels);
- }
-
- @Override
- public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
- checkNotNull(channelId);
-
- Lock lock = channelLocks.get(channelId);
- lock.lock();
- try {
- return Optional.ofNullable(channels.get(channelId));
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId) {
- final ManagedChannel channel = channels.get(channelId);
- if (channel == null) {
- log.warn("Unable to find any channel with ID {}, cannot send probe",
- channelId);
- return CompletableFuture.completedFuture(false);
- }
- return CompletableFuture.supplyAsync(() -> {
- try {
- doDummyMessage(channel);
- return true;
- } catch (StatusRuntimeException e) {
- log.debug("Probe for {} failed", channelId);
- log.debug("", e);
- return false;
- }
- });
+ public Optional<ManagedChannel> get(URI channelUri) {
+ checkNotNull(channelUri);
+ return Optional.ofNullable(channels.get(channelUri));
}
}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java
index 45ec5ba..6f06c11 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java
@@ -26,12 +26,12 @@
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.protobuf.lite.ProtoLiteUtils;
-import org.onosproject.grpc.api.GrpcChannelId;
import org.slf4j.Logger;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringJoiner;
@@ -55,13 +55,13 @@
private static final Logger log = getLogger(GrpcLoggingInterceptor.class);
private final AtomicLong callIdGenerator = new AtomicLong();
- private final GrpcChannelId channelId;
+ private final URI channelUri;
private final AtomicBoolean enabled;
private FileWriter writer;
- GrpcLoggingInterceptor(GrpcChannelId channelId, AtomicBoolean enabled) {
- this.channelId = channelId;
+ GrpcLoggingInterceptor(URI channelUri, AtomicBoolean enabled) {
+ this.channelUri = channelUri;
this.enabled = enabled;
}
@@ -69,14 +69,13 @@
if (writer != null) {
return true;
}
- final String safeChName = channelId.id()
- .replaceAll("[^A-Za-z0-9]", "_");
- final String fileName = format("grpc_%s_", safeChName).toLowerCase();
+ final String safeChName = channelUri.toString()
+ .replaceAll("[^A-Za-z0-9]", "_").toLowerCase();
try {
- final File tmpFile = File.createTempFile(fileName, ".log");
+ final File tmpFile = File.createTempFile(safeChName + "_", ".log");
this.writer = new FileWriter(tmpFile);
log.info("Created gRPC call log file for channel {}: {}",
- channelId, tmpFile.getAbsolutePath());
+ channelUri, tmpFile.getAbsolutePath());
return true;
} catch (IOException e) {
log.error("Unable to initialize gRPC call log writer", e);
@@ -90,7 +89,7 @@
return;
}
try {
- log.info("Closing log writer for {}...", channelId);
+ log.info("Closing log writer for {}...", channelUri);
writer.close();
} catch (IOException e) {
log.error("Unable to close gRPC call log writer", e);
diff --git a/protocols/grpc/proto/BUILD b/protocols/grpc/proto/BUILD
deleted file mode 100644
index 48249c0..0000000
--- a/protocols/grpc/proto/BUILD
+++ /dev/null
@@ -1,11 +0,0 @@
-load("//tools/build/bazel:osgi_java_library.bzl", "osgi_proto_jar")
-
-osgi_proto_jar(
- grpc_proto_lib = ":dummy_proto",
- proto_libs = [":dummy_proto"],
-)
-
-proto_library(
- name = "dummy_proto",
- srcs = ["dummy.proto"],
-)
diff --git a/protocols/grpc/proto/dummy.proto b/protocols/grpc/proto/dummy.proto
deleted file mode 100644
index 003f403..0000000
--- a/protocols/grpc/proto/dummy.proto
+++ /dev/null
@@ -1,13 +0,0 @@
-syntax = "proto3";
-
-option java_package = "org.onosproject.grpc.proto.dummy";
-
-package dummy;
-
-service DummyService {
- rpc SayHello (DummyMessageThatNoOneWouldReallyUse) returns (DummyMessageThatNoOneWouldReallyUse) {
- }
-}
-
-message DummyMessageThatNoOneWouldReallyUse {
-}
diff --git a/protocols/grpc/utils/BUILD b/protocols/grpc/utils/BUILD
new file mode 100644
index 0000000..7baaf52
--- /dev/null
+++ b/protocols/grpc/utils/BUILD
@@ -0,0 +1,8 @@
+COMPILE_DEPS = CORE_DEPS + [
+ "//protocols/grpc/api:onos-protocols-grpc-api",
+ "@io_grpc_grpc_java//core",
+]
+
+osgi_jar(
+ deps = COMPILE_DEPS,
+)
diff --git a/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/AbstractGrpcHandlerBehaviour.java b/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/AbstractGrpcHandlerBehaviour.java
new file mode 100644
index 0000000..c66dede
--- /dev/null
+++ b/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/AbstractGrpcHandlerBehaviour.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.utils;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.onosproject.grpc.api.GrpcClient;
+import org.onosproject.grpc.api.GrpcClientController;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.config.basics.BasicDeviceConfig;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Abstract implementation of HandlerBehaviour for gNMI-based devices.
+ *
+ * @param <CLIENT> gRPC client class
+ * @param <CTRL> gRPC controller class
+ */
+public class AbstractGrpcHandlerBehaviour
+ <CLIENT extends GrpcClient, CTRL extends GrpcClientController<CLIENT>>
+ extends AbstractHandlerBehaviour {
+
+ static final ConcurrentMap<DeviceId, URI> CHANNEL_URIS = Maps.newConcurrentMap();
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ final Class<CTRL> controllerClass;
+ protected DeviceId deviceId;
+ protected DeviceService deviceService;
+ protected CLIENT client;
+
+ public AbstractGrpcHandlerBehaviour(Class<CTRL> controllerClass) {
+ this.controllerClass = controllerClass;
+ }
+
+ protected boolean setupBehaviour(String opName) {
+ deviceId = handler().data().deviceId();
+ deviceService = handler().get(DeviceService.class);
+ client = getClientByNetcfg();
+ if (client == null) {
+ log.warn("Missing client for {}, aborting {}", deviceId, opName);
+ return false;
+ }
+
+ return true;
+ }
+
+ private CLIENT getClientByNetcfg() {
+ // Check if there's a channel for this device and if we created it with
+ // the same URI of that derived from the current netcfg. This makes sure
+ // we return null if the netcfg changed after we created the channel.
+ if (!CHANNEL_URIS.containsKey(data().deviceId()) ||
+ !CHANNEL_URIS.get(data().deviceId()).equals(mgmtUriFromNetcfg())) {
+ return null;
+ }
+ return handler().get(controllerClass).get(data().deviceId());
+ }
+
+ protected URI mgmtUriFromNetcfg() {
+ deviceId = handler().data().deviceId();
+
+ final BasicDeviceConfig cfg = handler().get(NetworkConfigService.class)
+ .getConfig(deviceId, BasicDeviceConfig.class);
+ if (cfg == null || Strings.isNullOrEmpty(cfg.managementAddress())) {
+ log.error("Missing or invalid config for {}, cannot derive " +
+ "gRPC server endpoints", deviceId);
+ return null;
+ }
+
+ try {
+ return new URI(cfg.managementAddress());
+ } catch (URISyntaxException e) {
+ log.error("Management address of {} is not a valid URI: {}",
+ deviceId, cfg.managementAddress());
+ return null;
+ }
+ }
+}
diff --git a/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/AbstractGrpcHandshaker.java b/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/AbstractGrpcHandshaker.java
new file mode 100644
index 0000000..0183133
--- /dev/null
+++ b/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/AbstractGrpcHandshaker.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.utils;
+
+import com.google.common.util.concurrent.Striped;
+import io.grpc.ManagedChannel;
+import org.onosproject.grpc.api.GrpcChannelController;
+import org.onosproject.grpc.api.GrpcClient;
+import org.onosproject.grpc.api.GrpcClientController;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.device.DeviceHandshaker;
+import org.onosproject.net.provider.ProviderId;
+
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+/**
+ * Abstract implementation of DeviceHandshaker that uses gRPC to establish a
+ * connection to the device.
+ *
+ * @param <CLIENT> gRPC client class
+ * @param <CTRL> gRPC controller class
+ */
+public abstract class AbstractGrpcHandshaker
+ <CLIENT extends GrpcClient, CTRL extends GrpcClientController<CLIENT>>
+ extends AbstractGrpcHandlerBehaviour<CLIENT, CTRL>
+ implements DeviceHandshaker {
+
+ /**
+ * Creates a new instance of this behaviour for the given gRPC controller
+ * class.
+ *
+ * @param controllerClass gRPC controller class
+ */
+ public AbstractGrpcHandshaker(Class<CTRL> controllerClass) {
+ super(controllerClass);
+ }
+
+ private static final Striped<Lock> DEVICE_LOCKS = Striped.lock(10);
+
+ @Override
+ public boolean connect() {
+ final GrpcChannelController channelController = handler().get(
+ GrpcChannelController.class);
+ final CTRL clientController = handler().get(controllerClass);
+ final DeviceId deviceId = data().deviceId();
+
+ final URI netcfgUri = mgmtUriFromNetcfg();
+ if (netcfgUri == null) {
+ return false;
+ }
+
+ DEVICE_LOCKS.get(deviceId).lock();
+ try {
+ if (clientController.get(deviceId) != null) {
+ throw new IllegalStateException(
+ "A client for this device already exists");
+ }
+
+ // Create or get an existing channel. We support sharing the same
+ // channel by different drivers for the same device.
+ final ManagedChannel channel;
+ final URI existingChannelUri = CHANNEL_URIS.get(deviceId);
+ if (existingChannelUri != null) {
+ if (!existingChannelUri.equals(netcfgUri)) {
+ throw new IllegalStateException(
+ "A gRPC channel with different URI already " +
+ "exists for this device");
+ }
+ channel = channelController.get(existingChannelUri)
+ .orElseThrow(() -> new IllegalStateException(
+ "Missing gRPC channel in controller"));
+ } else {
+ try {
+ channel = channelController.create(netcfgUri);
+ } catch (IllegalArgumentException ex) {
+ throw new IllegalStateException(
+ "A gRPC channel with same URI already exists", ex);
+ }
+ // Store channel URI for future use.
+ CHANNEL_URIS.put(deviceId, netcfgUri);
+ // Trigger connection.
+ channel.getState(true);
+ }
+
+ return clientController.create(deviceId, channel);
+ } finally {
+ DEVICE_LOCKS.get(deviceId).unlock();
+ }
+ }
+
+ @Override
+ public boolean hasConnection() {
+ final DeviceId deviceId = data().deviceId();
+ final URI netcfgUri = mgmtUriFromNetcfg();
+ // If a client already exists for this device, but the netcfg with the
+ // server endpoints has changed, this will return false.
+ DEVICE_LOCKS.get(deviceId).lock();
+ try {
+ final URI existingChannelUri = CHANNEL_URIS.get(deviceId);
+ return existingChannelUri != null &&
+ existingChannelUri.equals(netcfgUri) &&
+ handler().get(GrpcChannelController.class)
+ .get(existingChannelUri).isPresent() &&
+ handler().get(controllerClass)
+ .get(deviceId) != null;
+ } finally {
+ DEVICE_LOCKS.get(deviceId).unlock();
+ }
+ }
+
+ @Override
+ public void disconnect() {
+ final DeviceId deviceId = data().deviceId();
+ final URI netcfgUri = mgmtUriFromNetcfg();
+ // This removes any clients and channels associated with this device ID.
+ DEVICE_LOCKS.get(deviceId).lock();
+ try {
+ final URI existingChannelUri = CHANNEL_URIS.remove(deviceId);
+ handler().get(controllerClass).remove(deviceId);
+ if (existingChannelUri != null) {
+ handler().get(GrpcChannelController.class).destroy(existingChannelUri);
+ }
+ if (netcfgUri != null) {
+ // This should not be needed if we are sure there can never be
+ // two channels for the same device.
+ handler().get(GrpcChannelController.class).destroy(netcfgUri);
+ }
+ } finally {
+ DEVICE_LOCKS.get(deviceId).unlock();
+ }
+ }
+
+ @Override
+ public boolean isReachable() {
+ return setupBehaviour("isReachable()") && client.isServerReachable();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> probeReachability() {
+ if (!setupBehaviour("probeReachability()")) {
+ return completedFuture(false);
+ }
+ return client.probeService();
+ }
+
+ @Override
+ public void addDeviceAgentListener(ProviderId providerId, DeviceAgentListener listener) {
+ // Don't use controller/deviceId class variables as they might be uninitialized.
+ handler().get(controllerClass)
+ .addDeviceAgentListener(data().deviceId(), providerId, listener);
+ }
+
+ @Override
+ public void removeDeviceAgentListener(ProviderId providerId) {
+ // Don't use controller/deviceId class variable as they might be uninitialized.
+ handler().get(controllerClass)
+ .removeDeviceAgentListener(data().deviceId(), providerId);
+ }
+}
diff --git a/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/package-info.java b/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/package-info.java
new file mode 100644
index 0000000..ab13d88
--- /dev/null
+++ b/protocols/grpc/utils/src/main/java/org/onosproject/grpc/utils/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gRPC protocol utils.
+ */
+package org.onosproject.grpc.utils;
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
deleted file mode 100644
index 452e978..0000000
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.api;
-
-import com.google.common.annotations.Beta;
-import org.onosproject.grpc.api.GrpcClientKey;
-import org.onosproject.net.DeviceId;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLDecoder;
-
-import static java.lang.String.format;
-
-/**
- * Key that uniquely identifies a P4Runtime client.
- */
-@Beta
-public final class P4RuntimeClientKey extends GrpcClientKey {
-
- private static final String DEVICE_ID_PARAM = "device_id=";
-
- private static final String P4RUNTIME = "P4Runtime";
- private final long p4DeviceId;
-
- /**
- * Creates a new client key. The server URI is expected to carry the
- * P4runtime server-internal 'device_id' as a param in the query string. For
- * example, grpc://10.0.0.1:5001?device_id=1
- *
- * @param deviceId ONOS device ID
- * @param serverUri P4Runtime server URI
- */
- public P4RuntimeClientKey(DeviceId deviceId, URI serverUri) {
- super(P4RUNTIME, deviceId, serverUri);
- this.p4DeviceId = extractP4DeviceId(serverUri);
- }
-
- private static Long extractP4DeviceId(URI uri) {
- String[] segments = uri.getRawQuery().split("&");
- try {
- for (String s : segments) {
- if (s.startsWith(DEVICE_ID_PARAM)) {
- return Long.parseUnsignedLong(
- URLDecoder.decode(
- s.substring(DEVICE_ID_PARAM.length()), "utf-8"));
- }
- }
- } catch (UnsupportedEncodingException e) {
- throw new IllegalArgumentException(format(
- "Unable to decode P4Runtime-interval device_id from URI %s: %s",
- uri, e.toString()));
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException(format(
- "Invalid P4Runtime-interval device_id in URI %s: %s",
- uri, e.toString()));
- }
- throw new IllegalArgumentException(format(
- "Missing P4Runtime-interval device_id in URI %s",
- uri));
- }
-
- /**
- * Returns the P4Runtime server-internal device ID.
- *
- * @return P4Runtime server-internal device ID
- */
- public long p4DeviceId() {
- return p4DeviceId;
- }
-}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 18e925a..fec4389 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -25,7 +25,7 @@
*/
@Beta
public interface P4RuntimeController
- extends GrpcClientController<P4RuntimeClientKey, P4RuntimeClient>,
+ extends GrpcClientController<P4RuntimeClient>,
ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
index 81928c1..2d9cb94 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
@@ -36,27 +36,30 @@
* blob") to be used in the P4Runtime's {@code SetPipelineConfig} message.
* Returns true if the operations was successful, false otherwise.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @param pipeconf pipeconf
* @param deviceData target-specific data
* @return completable future, true if the operations was successful, false
* otherwise.
*/
CompletableFuture<Boolean> setPipelineConfig(
- PiPipeconf pipeconf, ByteBuffer deviceData);
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData);
/**
- * Same as {@link #setPipelineConfig(PiPipeconf, ByteBuffer)}, but blocks
- * execution.
+ * Same as {@link #setPipelineConfig(long, PiPipeconf, ByteBuffer)}, but
+ * blocks execution.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @param pipeconf pipeconf
* @param deviceData target-specific data
* @return true if the operations was successful, false otherwise.
*/
default boolean setPipelineConfigSync(
- PiPipeconf pipeconf, ByteBuffer deviceData) {
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
checkNotNull(pipeconf);
checkNotNull(deviceData);
- return Futures.getUnchecked(setPipelineConfig(pipeconf, deviceData));
+ return Futures.getUnchecked(setPipelineConfig(
+ p4DeviceId, pipeconf, deviceData));
}
/**
@@ -66,48 +69,53 @@
* P4Info and target-specific data for comparison.
* <p>
* This method is expected to return {@code true} if invoked after calling
- * {@link #setPipelineConfig(PiPipeconf, ByteBuffer)} with the same
+ * {@link #setPipelineConfig(long, PiPipeconf, ByteBuffer)} with the same
* parameters.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @param pipeconf pipeconf
* @param deviceData target-specific data
* @return completable future, true if the device has the given pipeconf
* set, false otherwise.
*/
CompletableFuture<Boolean> isPipelineConfigSet(
- PiPipeconf pipeconf, ByteBuffer deviceData);
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData);
/**
- * Same as {@link #isPipelineConfigSet(PiPipeconf, ByteBuffer)} but blocks
- * execution.
+ * Same as {@link #isPipelineConfigSet(long, PiPipeconf, ByteBuffer)} but
+ * blocks execution.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @param pipeconf pipeconf
* @param deviceData target-specific data
* @return true if the device has the given pipeconf set, false otherwise.
*/
default boolean isPipelineConfigSetSync(
- PiPipeconf pipeconf, ByteBuffer deviceData) {
- return Futures.getUnchecked(isPipelineConfigSet(pipeconf, deviceData));
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
+ return Futures.getUnchecked(isPipelineConfigSet(
+ p4DeviceId, pipeconf, deviceData));
}
/**
* Returns true if the device has a pipeline config set, false otherwise.
* <p>
* This method is expected to return {@code true} if invoked after
- * successfully calling {@link #setPipelineConfig(PiPipeconf, ByteBuffer)}
- * with any parameter.
+ * successfully calling {@link #setPipelineConfig(long, PiPipeconf,
+ * ByteBuffer)} with any parameter.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @return completable future, true if the device has a pipeline config set,
* false otherwise.
*/
- CompletableFuture<Boolean> isAnyPipelineConfigSet();
+ CompletableFuture<Boolean> isAnyPipelineConfigSet(long p4DeviceId);
/**
- * Same as {@link #isAnyPipelineConfigSet()}, but blocks execution.
+ * Same as {@link #isAnyPipelineConfigSet(long)}, but blocks execution.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @return true if the device has a pipeline config set, false otherwise.
*/
- default boolean isAnyPipelineConfigSetSync() {
- return Futures.getUnchecked(isAnyPipelineConfigSet());
+ default boolean isAnyPipelineConfigSetSync(long p4DeviceId) {
+ return Futures.getUnchecked(isAnyPipelineConfigSet(p4DeviceId));
}
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
index d4c3f61..b99aef3 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeReadClient.java
@@ -35,12 +35,14 @@
/**
* Returns a new {@link ReadRequest} instance that can bed used to build a
- * batched read request, for the given pipeconf.
+ * batched read request, for the given P4Runtime-internal device ID and
+ * pipeconf.
*
- * @param pipeconf pipeconf
+ * @param p4DeviceId P4Runtime-internal device ID
+ * @param pipeconf pipeconf
* @return new read request
*/
- ReadRequest read(PiPipeconf pipeconf);
+ ReadRequest read(long p4DeviceId, PiPipeconf pipeconf);
/**
* Abstraction of a batched P4Runtime read request. Multiple entities can be
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
index 555f906..3ef6843 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
@@ -32,48 +32,58 @@
public interface P4RuntimeStreamClient {
/**
- * Opportunistically opens a session with the server by starting a
- * StreamChannel RPC and sends a {@code MasterArbitrationUpdate} message
- * with the given election ID. The {@code master} boolean flag is used to
- * indicated if we are trying to became master or not. If false, the
- * implementation might delay sending the {@code MasterArbitrationUpdate}
- * message until another node becomes master with a higher election ID.
+ * Opportunistically opens a session with the server for the given
+ * P4Runtime-internal device ID by starting a StreamChannel RPC and sending
+ * a {@code MasterArbitrationUpdate} message with the given election ID. The
+ * {@code master} boolean flag is used to indicated if we are trying to
+ * became master or not. If false, the implementation might delay sending
+ * the {@code MasterArbitrationUpdate} message until another node becomes
+ * master with a higher election ID.
* <p>
* If the server acknowledges this client as master, the {@link
* P4RuntimeController} is expected to generate a {@link
* org.onosproject.net.device.DeviceAgentEvent} with type {@link
* org.onosproject.net.device.DeviceAgentEvent.Type#ROLE_MASTER}.
*
- * @param master true if we are trying to become master
+ * @param p4DeviceId P4Runtime-internal device ID
+ * @param master true if we are trying to become master
* @param electionId election ID
*/
- void setMastership(boolean master, BigInteger electionId);
+ void setMastership(long p4DeviceId, boolean master, BigInteger electionId);
/**
* Returns true if the StreamChannel RPC is active and hence the P4Runtime
- * session is open, false otherwise.
+ * session for the given P4Runtime-internal device ID is open, false
+ * otherwise.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @return boolean
*/
- boolean isSessionOpen();
+ boolean isSessionOpen(long p4DeviceId);
/**
- * Closes the session to the server by terminating the Stream RPC.
- */
- void closeSession();
-
- /**
- * Returns true if this client is master for the server, false otherwise.
+ * Closes the session to the server by terminating the StreamChannel RPC for
+ * the given P4Runtime-internal device ID.
*
+ * @param p4DeviceId P4Runtime-internal device ID
+ */
+ void closeSession(long p4DeviceId);
+
+ /**
+ * Returns true if this client is master for the given P4Runtime-internal
+ * device ID, false otherwise.
+ *
+ * @param p4DeviceId P4Runtime-internal device ID
* @return boolean
*/
- boolean isMaster();
+ boolean isMaster(long p4DeviceId);
/**
- * Sends a packet-out for the given pipeconf.
+ * Sends a packet-out for the given P4Runtime-internal device ID.
*
- * @param packet packet-out operation to be performed by the device
- * @param pipeconf pipeconf currently deployed on the device
+ * @param p4DeviceId P4Runtime-internal device ID
+ * @param packet packet-out operation to be performed by the device
+ * @param pipeconf pipeconf currently deployed on the device
*/
- void packetOut(PiPacketOperation packet, PiPipeconf pipeconf);
+ void packetOut(long p4DeviceId, PiPacketOperation packet, PiPipeconf pipeconf);
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
index a6f3408..4185f70 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeWriteClient.java
@@ -33,12 +33,14 @@
/**
* Returns a new {@link WriteRequest} instance that can be used to build a
- * batched write request, for the given pipeconf.
+ * batched write request, for the given P4Runtime-internal device ID and
+ * pipeconf.
*
- * @param pipeconf pipeconf
+ * @param p4DeviceId P4Runtime-internal device ID
+ * @param pipeconf pipeconf
* @return new write request
*/
- WriteRequest write(PiPipeconf pipeconf);
+ WriteRequest write(long p4DeviceId, PiPipeconf pipeconf);
/**
* Signals the type of write operation for a given PI entity.
@@ -222,8 +224,6 @@
* responce from the device, in the same order they were added to this
* batch.
*
- *
- *
* @return entity update requests
*/
Collection<EntityUpdateRequest> pendingUpdates();
@@ -338,9 +338,9 @@
/**
* Returns the status for this PI entity. If {@link #isSuccess()}
* returns {@code true}, then this method is expected to return {@link
- * EntityUpdateStatus#OK}. If {@link EntityUpdateStatus#OTHER_ERROR}
- * is returned, further details might be provided in {@link
- * #explanation()} and {@link #throwable()}.
+ * EntityUpdateStatus#OK}. If {@link EntityUpdateStatus#OTHER_ERROR} is
+ * returned, further details might be provided in {@link #explanation()}
+ * and {@link #throwable()}.
*
* @return status
*/
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
index d54ef97..653fcfd 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
@@ -16,6 +16,8 @@
package org.onosproject.p4runtime.ctl.client;
+import com.google.common.collect.Maps;
+import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -27,7 +29,6 @@
import org.onosproject.net.pi.runtime.PiPacketOperation;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
-import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
import p4.v1.P4RuntimeGrpc;
@@ -38,6 +39,7 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -50,6 +52,8 @@
public final class P4RuntimeClientImpl
extends AbstractGrpcClient implements P4RuntimeClient {
+ private static final long DEFAULT_P4_DEVICE_ID = 1;
+
// TODO: consider making timeouts configurable per-device via netcfg
/**
* Timeout in seconds for short/fast RPCs.
@@ -62,94 +66,103 @@
*/
static final int LONG_TIMEOUT_SECONDS = 60;
- private final long p4DeviceId;
private final P4RuntimeControllerImpl controller;
- private final StreamClientImpl streamClient;
private final PipelineConfigClientImpl pipelineConfigClient;
+ private final PiPipeconfService pipeconfService;
+ private final MasterElectionIdStore masterElectionIdStore;
+ private final ConcurrentMap<Long, StreamClientImpl> streamClients = Maps.newConcurrentMap();
/**
* Instantiates a new client with the given arguments.
*
- * @param clientKey client key
+ * @param deviceId device ID
* @param channel gRPC managed channel
- * @param controller P$Runtime controller instance
+ * @param controller P4Runtime controller instance
* @param pipeconfService pipeconf service instance
* @param masterElectionIdStore master election ID store
*/
- public P4RuntimeClientImpl(P4RuntimeClientKey clientKey,
+ public P4RuntimeClientImpl(DeviceId deviceId,
ManagedChannel channel,
P4RuntimeControllerImpl controller,
PiPipeconfService pipeconfService,
MasterElectionIdStore masterElectionIdStore) {
- super(clientKey, channel, true, controller);
+ super(deviceId, channel, true, controller);
checkNotNull(channel);
checkNotNull(controller);
checkNotNull(pipeconfService);
checkNotNull(masterElectionIdStore);
- this.p4DeviceId = clientKey.p4DeviceId();
this.controller = controller;
- this.streamClient = new StreamClientImpl(
- pipeconfService, masterElectionIdStore, this, controller);
+ this.pipeconfService = pipeconfService;
+ this.masterElectionIdStore = masterElectionIdStore;
this.pipelineConfigClient = new PipelineConfigClientImpl(this);
}
@Override
public void shutdown() {
- streamClient.closeSession();
+ streamClients.forEach((p4DeviceId, streamClient) ->
+ streamClient.closeSession(p4DeviceId));
super.shutdown();
}
@Override
public CompletableFuture<Boolean> setPipelineConfig(
- PiPipeconf pipeconf, ByteBuffer deviceData) {
- return pipelineConfigClient.setPipelineConfig(pipeconf, deviceData);
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
+ return pipelineConfigClient.setPipelineConfig(p4DeviceId, pipeconf, deviceData);
}
@Override
public CompletableFuture<Boolean> isPipelineConfigSet(
- PiPipeconf pipeconf, ByteBuffer deviceData) {
- return pipelineConfigClient.isPipelineConfigSet(pipeconf, deviceData);
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
+ return pipelineConfigClient.isPipelineConfigSet(p4DeviceId, pipeconf, deviceData);
}
@Override
- public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
- return pipelineConfigClient.isAnyPipelineConfigSet();
+ public CompletableFuture<Boolean> isAnyPipelineConfigSet(long p4DeviceId) {
+ return pipelineConfigClient.isAnyPipelineConfigSet(p4DeviceId);
}
@Override
- public ReadRequest read(PiPipeconf pipeconf) {
- return new ReadRequestImpl(this, pipeconf);
+ public ReadRequest read(long p4DeviceId, PiPipeconf pipeconf) {
+ return new ReadRequestImpl(this, p4DeviceId, pipeconf);
}
@Override
- public boolean isSessionOpen() {
- return streamClient.isSessionOpen();
+ public boolean isSessionOpen(long p4DeviceId) {
+ return streamClients.containsKey(p4DeviceId) &&
+ streamClients.get(p4DeviceId).isSessionOpen(p4DeviceId);
}
@Override
- public void closeSession() {
- streamClient.closeSession();
+ public void closeSession(long p4DeviceId) {
+ if (streamClients.containsKey(p4DeviceId)) {
+ streamClients.get(p4DeviceId).closeSession(p4DeviceId);
+ }
}
@Override
- public void setMastership(boolean master, BigInteger newElectionId) {
- streamClient.setMastership(master, newElectionId);
+ public void setMastership(long p4DeviceId, boolean master, BigInteger newElectionId) {
+ streamClients.putIfAbsent(p4DeviceId, new StreamClientImpl(
+ pipeconfService, masterElectionIdStore, this, p4DeviceId, controller));
+ streamClients.get(p4DeviceId).setMastership(p4DeviceId, master, newElectionId);
}
@Override
- public boolean isMaster() {
- return streamClient.isMaster();
+ public boolean isMaster(long p4DeviceId) {
+ return streamClients.containsKey(p4DeviceId) &&
+ streamClients.get(p4DeviceId).isMaster(p4DeviceId);
}
@Override
- public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
- streamClient.packetOut(packet, pipeconf);
+ public void packetOut(long p4DeviceId, PiPacketOperation packet, PiPipeconf pipeconf) {
+ if (streamClients.containsKey(p4DeviceId)) {
+ streamClients.get(p4DeviceId).packetOut(p4DeviceId, packet, pipeconf);
+ }
}
@Override
- public WriteRequest write(PiPipeconf pipeconf) {
- return new WriteRequestImpl(this, pipeconf);
+ public WriteRequest write(long p4DeviceId, PiPipeconf pipeconf) {
+ return new WriteRequestImpl(this, p4DeviceId, pipeconf);
}
@Override
@@ -164,14 +177,14 @@
@Override
public void onError(Throwable t) {
- if (Status.fromThrowable(t).getCode() ==
- Status.Code.FAILED_PRECONDITION) {
- // Pipeline not set but service is available.
- future.complete(true);
- } else {
- log.debug("", t);
- }
- future.complete(false);
+ log.debug("", t);
+ // FIXME: The P4Runtime spec is not explicit about error
+ // codes when a pipeline config is not set, which would
+ // be useful here as it's an indication that the
+ // service is available. As a workaround, we simply
+ // check the channel state.
+ future.complete(ConnectivityState.READY.equals(
+ channel.getState(false)));
}
@Override
@@ -179,6 +192,9 @@
// Ignore, unary call.
}
};
+ // Get any p4DeviceId under the control of this client or a default one.
+ final long p4DeviceId = streamClients.isEmpty() ? DEFAULT_P4_DEVICE_ID
+ : streamClients.keySet().iterator().next();
// Use long timeout as the device might return the full P4 blob
// (e.g. server does not support cookie), over a slow network.
execRpc(s -> s.getForwardingPipelineConfig(
@@ -207,15 +223,6 @@
}
/**
- * Returns the P4Runtime-internal device ID associated with this client.
- *
- * @return P4Runtime-internal device ID
- */
- long p4DeviceId() {
- return this.p4DeviceId;
- }
-
- /**
* Returns the ONOS device ID associated with this client.
*
* @return ONOS device ID
@@ -226,14 +233,20 @@
/**
* Returns the election ID last used in a MasterArbitrationUpdate message
- * sent by the client to the server. No guarantees are given that this is
- * the current election ID associated to the session, nor that the server
- * has acknowledged this value as valid.
+ * sent by the client to the server for the given P4Runtime-internal device
+ * ID. No guarantees are given that this is the current election ID
+ * associated to the session, nor that the server has acknowledged this
+ * value as valid.
*
+ * @param p4DeviceId P4Runtime-internal device ID
* @return election ID uint128 protobuf message
*/
- P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
- return streamClient.lastUsedElectionId();
+ P4RuntimeOuterClass.Uint128 lastUsedElectionId(long p4DeviceId) {
+ if (streamClients.containsKey(p4DeviceId)) {
+ return streamClients.get(p4DeviceId).lastUsedElectionId();
+ } else {
+ return P4RuntimeOuterClass.Uint128.getDefaultInstance();
+ }
}
/**
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
index b0b6c57..1dffc9c 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
@@ -63,9 +63,9 @@
@Override
public CompletableFuture<Boolean> setPipelineConfig(
- PiPipeconf pipeconf, ByteBuffer deviceData) {
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
- if (!client.isSessionOpen()) {
+ if (!client.isSessionOpen(p4DeviceId)) {
log.warn("Dropping set pipeline config request for {}, session is CLOSED",
client.deviceId());
return completedFuture(false);
@@ -86,8 +86,8 @@
final SetForwardingPipelineConfigRequest requestMsg =
SetForwardingPipelineConfigRequest
.newBuilder()
- .setDeviceId(client.p4DeviceId())
- .setElectionId(client.lastUsedElectionId())
+ .setDeviceId(p4DeviceId)
+ .setElectionId(client.lastUsedElectionId(p4DeviceId))
.setAction(VERIFY_AND_COMMIT)
.setConfig(pipelineConfigMsg)
.build();
@@ -159,15 +159,15 @@
@Override
public CompletableFuture<Boolean> isPipelineConfigSet(
- PiPipeconf pipeconf, ByteBuffer expectedDeviceData) {
- return getPipelineCookieFromServer()
+ long p4DeviceId, PiPipeconf pipeconf, ByteBuffer expectedDeviceData) {
+ return getPipelineCookieFromServer(p4DeviceId)
.thenApply(cfgFromDevice -> comparePipelineConfig(
pipeconf, expectedDeviceData, cfgFromDevice));
}
@Override
- public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
- return getPipelineCookieFromServer().thenApply(Objects::nonNull);
+ public CompletableFuture<Boolean> isAnyPipelineConfigSet(long p4DeviceId) {
+ return getPipelineCookieFromServer(p4DeviceId).thenApply(Objects::nonNull);
}
private boolean comparePipelineConfig(
@@ -211,11 +211,12 @@
.equals(expectedCfg.getP4Info());
}
- private CompletableFuture<ForwardingPipelineConfig> getPipelineCookieFromServer() {
+ private CompletableFuture<ForwardingPipelineConfig> getPipelineCookieFromServer(
+ long p4DeviceId) {
final GetForwardingPipelineConfigRequest request =
GetForwardingPipelineConfigRequest
.newBuilder()
- .setDeviceId(client.p4DeviceId())
+ .setDeviceId(p4DeviceId)
.setResponseType(COOKIE_ONLY)
.build();
final CompletableFuture<ForwardingPipelineConfig> future = new CompletableFuture<>();
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java
index c85c7e8..29a7169 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java
@@ -51,11 +51,11 @@
private final PiPipeconf pipeconf;
private final P4RuntimeOuterClass.ReadRequest.Builder requestMsg;
- ReadRequestImpl(P4RuntimeClientImpl client, PiPipeconf pipeconf) {
+ ReadRequestImpl(P4RuntimeClientImpl client, long p4DeviceId, PiPipeconf pipeconf) {
this.client = client;
this.pipeconf = pipeconf;
this.requestMsg = P4RuntimeOuterClass.ReadRequest.newBuilder()
- .setDeviceId(client.p4DeviceId());
+ .setDeviceId(p4DeviceId);
}
@Override
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
index 11e91aa..a14f960 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -55,7 +55,8 @@
/**
* Implementation of P4RuntimeStreamClient. Handles P4Runtime StreamChannel RPC
- * operations, such as arbitration update and packet-in/out.
+ * operations, such as arbitration update and packet-in/out, for a given
+ * P4Runtime-internal device ID.
*/
public final class StreamClientImpl implements P4RuntimeStreamClient {
@@ -87,24 +88,27 @@
PiPipeconfService pipeconfService,
MasterElectionIdStore masterElectionIdStore,
P4RuntimeClientImpl client,
+ long p4DeviceId,
P4RuntimeControllerImpl controller) {
this.client = client;
this.deviceId = client.deviceId();
- this.p4DeviceId = client.p4DeviceId();
+ this.p4DeviceId = p4DeviceId;
this.pipeconfService = pipeconfService;
this.masterElectionIdStore = masterElectionIdStore;
this.controller = controller;
}
@Override
- public boolean isSessionOpen() {
+ public boolean isSessionOpen(long p4DeviceId) {
+ checkArgument(this.p4DeviceId == p4DeviceId);
return streamChannelManager.isOpen();
}
@Override
- public void closeSession() {
+ public void closeSession(long p4DeviceId) {
+ checkArgument(this.p4DeviceId == p4DeviceId);
synchronized (requestedToBeMaster) {
- this.masterElectionIdStore.unsetListener(deviceId);
+ this.masterElectionIdStore.unsetListener(deviceId, p4DeviceId);
streamChannelManager.teardown();
pendingElectionId = null;
requestedToBeMaster.set(false);
@@ -113,14 +117,17 @@
}
@Override
- public void setMastership(boolean master, BigInteger newElectionId) {
+ public void setMastership(long p4DeviceId, boolean master,
+ BigInteger newElectionId) {
+ checkArgument(this.p4DeviceId == p4DeviceId);
checkNotNull(newElectionId);
checkArgument(newElectionId.compareTo(BigInteger.ZERO) > 0,
"newElectionId must be a non zero positive number");
synchronized (requestedToBeMaster) {
requestedToBeMaster.set(master);
pendingElectionId = newElectionId;
- handlePendingElectionId(masterElectionIdStore.get(deviceId));
+ handlePendingElectionId(masterElectionIdStore.get(
+ deviceId, p4DeviceId));
}
}
@@ -151,7 +158,8 @@
> ARBITRATION_TIMEOUT_SECONDS * 1000;
}
if (timeoutExpired) {
- log.warn("{} arbitration timeout expired! Will send pending election ID now...",
+ log.warn("Arbitration timeout expired for {}! " +
+ "Will send pending election ID now...",
deviceId);
}
if (!timeoutExpired &&
@@ -163,35 +171,41 @@
deviceId, masterElectionId, pendingElectionId);
// Will try again as soon as the master election ID store is
// updated...
- masterElectionIdStore.setListener(deviceId, masterElectionIdListener);
+ masterElectionIdStore.setListener(
+ deviceId, p4DeviceId, masterElectionIdListener);
// ..or in ARBITRATION_RETRY_SECONDS at the latest (if we missed
// the store event).
pendingElectionIdRetryTask = SharedScheduledExecutors.newTimeout(
- () -> handlePendingElectionId(masterElectionIdStore.get(deviceId)),
+ () -> handlePendingElectionId(
+ masterElectionIdStore.get(deviceId, p4DeviceId)),
ARBITRATION_RETRY_SECONDS, TimeUnit.SECONDS);
} else {
// Send now.
log.info("Setting mastership on {}... " +
- "master={}, newElectionId={}, masterElectionId={}",
+ "master={}, newElectionId={}, " +
+ "masterElectionId={}, sessionOpen={}",
deviceId, requestedToBeMaster.get(),
- pendingElectionId, masterElectionId);
+ pendingElectionId, masterElectionId,
+ streamChannelManager.isOpen());
sendMasterArbitrationUpdate(pendingElectionId);
pendingElectionId = null;
pendingElectionIdTimestamp = 0;
// No need to listen for master election ID changes.
- masterElectionIdStore.unsetListener(deviceId);
+ masterElectionIdStore.unsetListener(deviceId, p4DeviceId);
}
}
}
@Override
- public boolean isMaster() {
+ public boolean isMaster(long p4DeviceId) {
+ checkArgument(this.p4DeviceId == p4DeviceId);
return isMaster.get();
}
@Override
- public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
- if (!isSessionOpen()) {
+ public void packetOut(long p4DeviceId, PiPacketOperation packet, PiPipeconf pipeconf) {
+ checkArgument(this.p4DeviceId == p4DeviceId);
+ if (!isSessionOpen(p4DeviceId)) {
log.warn("Dropping packet-out request for {}, session is closed",
deviceId);
return;
@@ -293,7 +307,7 @@
// and that otherwise would not be aware of changes, keeping their
// pending mastership operations forever.
final BigInteger masterElectionId = uint128ToBigInteger(msg.getElectionId());
- masterElectionIdStore.set(deviceId, masterElectionId);
+ masterElectionIdStore.set(deviceId, p4DeviceId, masterElectionId);
log.debug("Received arbitration update from {}: isMaster={}, masterElectionId={}",
deviceId, isMaster.get(), masterElectionId);
@@ -332,7 +346,7 @@
}
/**
- * A manager for the P4Runtime stream channel that opportunistically creates
+ * A manager for the P4Runtime StreamChannel RPC that opportunistically creates
* new stream RCP stubs (e.g. when one fails because of errors) and posts
* channel events via the P4Runtime controller.
*/
@@ -355,7 +369,7 @@
private void initIfRequired() {
if (requestObserver == null) {
- log.debug("Creating new stream channel for {}...", deviceId);
+ log.debug("Starting new StreamChannel RPC for {}...", deviceId);
open.set(false);
client.execRpcNoTimeout(
s -> requestObserver =
@@ -397,7 +411,7 @@
}
/**
- * Handles messages received from the device on the stream channel.
+ * Handles messages received from the device on the StreamChannel RPC.
*/
private final class InternalStreamResponseObserver
implements StreamObserver<StreamMessageResponse> {
@@ -429,7 +443,7 @@
deviceId, message.getUpdateCase());
}
} catch (Throwable ex) {
- log.error("Exception while processing stream message from {}",
+ log.error("Exception while processing StreamMessageResponse from {}",
deviceId, ex);
}
}
@@ -442,12 +456,12 @@
log.warn("{} is unreachable ({})",
deviceId, sre.getCause().getMessage());
} else {
- log.warn("Error on stream channel for {}: {}",
+ log.warn("Error on StreamChannel RPC for {}: {}",
deviceId, throwable.getMessage());
}
log.debug("", throwable);
} else {
- log.error(format("Exception on stream channel for %s",
+ log.error(format("Exception on StreamChannel RPC for %s",
deviceId), throwable);
}
streamChannelManager.teardown();
@@ -455,7 +469,7 @@
@Override
public void onCompleted() {
- log.warn("Stream channel for {} has completed", deviceId);
+ log.warn("StreamChannel RPC for {} has completed", deviceId);
streamChannelManager.teardown();
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
index fb3a5fb..481cf04 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
@@ -60,11 +60,11 @@
// set once we receive a response from the device.
private final WriteResponseImpl.Builder responseBuilder;
- WriteRequestImpl(P4RuntimeClientImpl client, PiPipeconf pipeconf) {
+ WriteRequestImpl(P4RuntimeClientImpl client, long p4DeviceId, PiPipeconf pipeconf) {
this.client = checkNotNull(client);
this.pipeconf = checkNotNull(pipeconf);
this.requestMsg = P4RuntimeOuterClass.WriteRequest.newBuilder()
- .setDeviceId(client.p4DeviceId());
+ .setDeviceId(p4DeviceId);
this.responseBuilder = WriteResponseImpl.builder(client.deviceId());
}
@@ -160,7 +160,8 @@
checkState(!submitted.getAndSet(true),
"Request has already been submitted, cannot submit again");
final P4RuntimeOuterClass.WriteRequest writeRequest = requestMsg
- .setElectionId(client.lastUsedElectionId())
+ .setElectionId(client.lastUsedElectionId(
+ requestMsg.getDeviceId()))
.build();
log.debug("Sending write request to {} with {} updates...",
client.deviceId(), writeRequest.getUpdatesCount());
@@ -186,11 +187,13 @@
future.complete(responseBuilder.setSuccessAllAndBuild());
}
}
+
@Override
public void onError(Throwable t) {
client.handleRpcError(t, "WRITE");
future.complete(responseBuilder.setErrorsAndBuild(t));
}
+
@Override
public void onCompleted() {
// Nothing to do, unary call.
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
index dff2330..04e39f7 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
@@ -17,6 +17,7 @@
package org.onosproject.p4runtime.ctl.controller;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.KryoNamespace;
import org.onosproject.net.DeviceId;
import org.onosproject.store.serializers.KryoNamespaces;
@@ -49,64 +50,77 @@
private static final KryoNamespace SERIALIZER = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
+ .register(Pair.class)
+ .register(Long.class)
.register(BigInteger.class)
.build();
private final Logger log = getLogger(getClass());
- private final EventuallyConsistentMapListener<DeviceId, BigInteger> mapListener =
+ private final EventuallyConsistentMapListener<Pair<DeviceId, Long>, BigInteger> mapListener =
new InternalMapListener();
- private EventuallyConsistentMap<DeviceId, BigInteger> masterElectionIds;
- private ConcurrentMap<DeviceId, MasterElectionIdListener> listeners =
+ private EventuallyConsistentMap<Pair<DeviceId, Long>, BigInteger> masterElectionIds;
+ private ConcurrentMap<Pair<DeviceId, Long>, MasterElectionIdListener> listeners =
Maps.newConcurrentMap();
@Activate
public void activate() {
- this.listeners = Maps.newConcurrentMap();
- this.masterElectionIds = storageService.<DeviceId, BigInteger>eventuallyConsistentMapBuilder()
+ listeners = Maps.newConcurrentMap();
+ masterElectionIds = storageService.<Pair<DeviceId, Long>,
+ BigInteger>eventuallyConsistentMapBuilder()
.withName("p4runtime-master-election-ids")
.withSerializer(SERIALIZER)
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
- this.masterElectionIds.addListener(mapListener);
+ masterElectionIds.addListener(mapListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
- this.masterElectionIds.removeListener(mapListener);
- this.masterElectionIds.destroy();
- this.masterElectionIds = null;
- this.listeners.clear();
- this.listeners = null;
+ masterElectionIds.removeListener(mapListener);
+ masterElectionIds.destroy();
+ masterElectionIds = null;
+ listeners.clear();
+ listeners = null;
log.info("Stopped");
}
@Override
- public void set(DeviceId deviceId, BigInteger electionId) {
+ public void set(DeviceId deviceId, long p4DeviceId, BigInteger electionId) {
checkNotNull(deviceId);
checkNotNull(electionId);
- this.masterElectionIds.put(deviceId, electionId);
+ masterElectionIds.put(Pair.of(deviceId, p4DeviceId), electionId);
}
@Override
- public BigInteger get(DeviceId deviceId) {
+ public BigInteger get(DeviceId deviceId, long p4DeviceId) {
checkNotNull(deviceId);
- return this.masterElectionIds.get(deviceId);
+ return masterElectionIds.get(Pair.of(deviceId, p4DeviceId));
}
@Override
- public void remove(DeviceId deviceId) {
+ public void remove(DeviceId deviceId, long p4DeviceId) {
checkNotNull(deviceId);
- this.masterElectionIds.remove(deviceId);
+ masterElectionIds.remove(Pair.of(deviceId, p4DeviceId));
}
@Override
- public void setListener(DeviceId deviceId, MasterElectionIdListener newListener) {
+ public void removeAll(DeviceId deviceId) {
+ masterElectionIds.keySet().forEach(k -> {
+ if (k.getLeft().equals(deviceId)) {
+ masterElectionIds.remove(k);
+ }
+ });
+ }
+
+ @Override
+ public void setListener(DeviceId deviceId, long p4DeviceId,
+ MasterElectionIdListener newListener) {
checkNotNull(deviceId);
checkNotNull(newListener);
- listeners.compute(deviceId, (did, existingListener) -> {
+ listeners.compute(Pair.of(deviceId, p4DeviceId), (x, existingListener) -> {
if (existingListener == null || existingListener == newListener) {
return newListener;
} else {
@@ -117,13 +131,13 @@
}
@Override
- public void unsetListener(DeviceId deviceId) {
- listeners.remove(deviceId);
+ public void unsetListener(DeviceId deviceId, long p4DeviceId) {
+ listeners.remove(Pair.of(deviceId, p4DeviceId));
}
- private class InternalMapListener implements EventuallyConsistentMapListener<DeviceId, BigInteger> {
+ private class InternalMapListener implements EventuallyConsistentMapListener<Pair<DeviceId, Long>, BigInteger> {
@Override
- public void event(EventuallyConsistentMapEvent<DeviceId, BigInteger> event) {
+ public void event(EventuallyConsistentMapEvent<Pair<DeviceId, Long>, BigInteger> event) {
final MasterElectionIdListener listener = listeners.get(event.key());
if (listener == null) {
return;
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
index f5393d1..7e5a49f 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
@@ -21,51 +21,71 @@
import java.math.BigInteger;
/**
- * Store that keeps track of master election IDs for each device.
+ * Store that keeps track of master election IDs for each server (device) and
+ * P4Runtime-internal device ID.
*/
public interface MasterElectionIdStore {
/**
- * Sets the master election ID for the given device.
+ * Sets the master election ID for the given device and P4Runtime-internal
+ * device ID.
*
* @param deviceId device ID
+ * @param p4DeviceId P4Runtime-internal device ID
* @param electionId election ID
*/
- void set(DeviceId deviceId, BigInteger electionId);
+ void set(DeviceId deviceId, long p4DeviceId, BigInteger electionId);
/**
- * Returns the last known master election ID for the given device, or null.
+ * Returns the last known master election ID for the given device and
+ * P4Runtime-internal device ID, or null.
*
- * @param deviceId device ID
+ * @param deviceId device ID
+ * @param p4DeviceId P4Runtime-internal device ID
* @return election ID
*/
- BigInteger get(DeviceId deviceId);
+ BigInteger get(DeviceId deviceId, long p4DeviceId);
/**
- * Removes any state associated with the given device.
+ * Removes any state associated with the given device and P4Runtime-internal
+ * device ID.
+ *
+ * @param deviceId device ID
+ * @param p4DeviceId P4Runtime-internal device ID
+ */
+ void remove(DeviceId deviceId, long p4DeviceId);
+
+ /**
+ * Removes all state associated with the given device.
*
* @param deviceId device ID
*/
- void remove(DeviceId deviceId);
+ void removeAll(DeviceId deviceId);
/**
- * Sets a listener for the given device that will be invoked every time
- * there will be changes to the master election ID.
+ * Sets a listener for the given device and P4Runtime-internal device ID
+ * that will be invoked every time there will be changes to the master
+ * election ID.
*
- * @param deviceId device ID
- * @param listener listener
+ * @param deviceId device ID
+ * @param p4DeviceId P4Runtime-internal device ID
+ * @param listener listener
*/
- void setListener(DeviceId deviceId, MasterElectionIdListener listener);
+ void setListener(DeviceId deviceId, long p4DeviceId,
+ MasterElectionIdListener listener);
/**
- * Unset the listener for the given device.
+ * Unset the listener for the given device and P4Runtime-internal device
+ * ID.
*
- * @param deviceId device ID
+ * @param deviceId device ID
+ * @param p4DeviceId P4Runtime-internal device ID
*/
- void unsetListener(DeviceId deviceId);
+ void unsetListener(DeviceId deviceId, long p4DeviceId);
/**
- * Listener of master election ID changes for a specific device.
+ * Listener of master election ID changes for a specific device and
+ * P4Runtime-internal device ID.
*/
interface MasterElectionIdListener {
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
index 2256d5e..c188ee4 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
@@ -21,7 +21,6 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
-import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.api.P4RuntimeEventListener;
@@ -36,7 +35,7 @@
@Component(immediate = true, service = P4RuntimeController.class)
public class P4RuntimeControllerImpl
extends AbstractGrpcClientController
- <P4RuntimeClientKey, P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
+ <P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
implements P4RuntimeController {
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -46,27 +45,21 @@
private MasterElectionIdStore masterElectionIdStore;
public P4RuntimeControllerImpl() {
- super(P4RuntimeEvent.class);
+ super(P4RuntimeEvent.class, "P4Runtime");
}
@Override
- public void removeClient(DeviceId deviceId) {
- super.removeClient(deviceId);
+ public void remove(DeviceId deviceId) {
+ super.remove(deviceId);
// Assuming that when a client is removed, it is done so by all nodes,
// this is the best place to clear master election ID state.
- masterElectionIdStore.remove(deviceId);
- }
-
- @Override
- public void removeClient(P4RuntimeClientKey clientKey) {
- super.removeClient(clientKey);
- masterElectionIdStore.remove(clientKey.deviceId());
+ masterElectionIdStore.removeAll(deviceId);
}
@Override
protected P4RuntimeClient createClientInstance(
- P4RuntimeClientKey clientKey, ManagedChannel channel) {
- return new P4RuntimeClientImpl(clientKey, channel, this,
+ DeviceId deviceId, ManagedChannel channel) {
+ return new P4RuntimeClientImpl(deviceId, channel, this,
pipeconfService, masterElectionIdStore);
}
}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
index 595e07d..596546b 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
@@ -24,27 +24,32 @@
public class MockMasterElectionIdStore implements MasterElectionIdStore {
@Override
- public void set(DeviceId deviceId, BigInteger electionId) {
+ public void set(DeviceId deviceId, long p4DeviceId, BigInteger electionId) {
}
@Override
- public BigInteger get(DeviceId deviceId) {
+ public BigInteger get(DeviceId deviceId, long p4DeviceId) {
return null;
}
@Override
- public void remove(DeviceId deviceId) {
+ public void remove(DeviceId deviceId, long p4DeviceId) {
}
@Override
- public void setListener(DeviceId deviceId, MasterElectionIdListener listener) {
+ public void removeAll(DeviceId deviceId) {
}
@Override
- public void unsetListener(DeviceId deviceId) {
+ public void setListener(DeviceId deviceId, long p4DeviceId, MasterElectionIdListener listener) {
+
+ }
+
+ @Override
+ public void unsetListener(DeviceId deviceId, long p4DeviceId) {
}
}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index 4c4dcdf..146f102 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -25,6 +25,7 @@
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.AbstractServerImplBuilder;
import org.easymock.EasyMock;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -44,7 +45,6 @@
import org.onosproject.net.pi.runtime.PiActionProfileGroupId;
import org.onosproject.net.pi.runtime.PiActionProfileMember;
import org.onosproject.net.pi.runtime.PiActionProfileMemberId;
-import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl;
import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
@@ -55,8 +55,6 @@
import p4.v1.P4RuntimeOuterClass.WriteRequest;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collection;
import java.util.List;
@@ -64,7 +62,6 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static java.lang.String.format;
import static org.easymock.EasyMock.niceMock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -109,8 +106,6 @@
private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
private static final long DEFAULT_TIMEOUT_TIME = 10;
private static final Uint128 DEFAULT_ELECTION_ID = Uint128.getDefaultInstance();
- private static final String P4R_IP = "127.0.0.1";
- private static final int P4R_PORT = 50010;
private org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl client;
private P4RuntimeControllerImpl controller;
@@ -160,20 +155,23 @@
@Before
- public void setup() throws URISyntaxException {
+ public void setup() {
controller = niceMock(org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl.class);
- P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, new URI(
- format("grpc://%s:%d?device_id=%d", P4R_IP, P4R_PORT, P4_DEVICE_ID)));
client = new P4RuntimeClientImpl(
- clientKey, grpcChannel, controller, new MockPipeconfService(),
+ DEVICE_ID, grpcChannel, controller, new MockPipeconfService(),
new MockMasterElectionIdStore());
}
+ @After
+ public void teardown() {
+ client.shutdown();
+ }
+
@Test
public void testInsertPiActionProfileGroup() throws Exception {
CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
- client.write(PIPECONF).insert(GROUP).submitSync();
- assertTrue(client.write(PIPECONF).insert(GROUP).submitSync().isSuccess());
+ client.write(P4_DEVICE_ID, PIPECONF).insert(GROUP).submitSync();
+ assertTrue(client.write(P4_DEVICE_ID, PIPECONF).insert(GROUP).submitSync().isSuccess());
complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);
WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
assertEquals(1, result.getDeviceId());
@@ -202,7 +200,7 @@
@Test
public void testInsertPiActionMembers() throws Exception {
CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
- assertTrue(client.write(PIPECONF).insert(GROUP_MEMBER_INSTANCES)
+ assertTrue(client.write(P4_DEVICE_ID, PIPECONF).insert(GROUP_MEMBER_INSTANCES)
.submitSync().isSuccess());
complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);
WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
@@ -251,7 +249,7 @@
p4RuntimeServerImpl.willReturnReadResult(responses);
CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
- Collection<PiActionProfileGroup> groups = client.read(PIPECONF)
+ Collection<PiActionProfileGroup> groups = client.read(P4_DEVICE_ID, PIPECONF)
.actionProfileGroups(ACT_PROF_ID)
.submitSync().all(PiActionProfileGroup.class);
complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);
@@ -300,7 +298,7 @@
p4RuntimeServerImpl.willReturnReadResult(responses);
CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
- Collection<PiActionProfileMember> piMembers = client.read(PIPECONF)
+ Collection<PiActionProfileMember> piMembers = client.read(P4_DEVICE_ID, PIPECONF)
.actionProfileMembers(ACT_PROF_ID).submitSync()
.all(PiActionProfileMember.class);
complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 35a64b1..46fb7f1 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -541,7 +541,7 @@
return;
}
- if (!handshaker.isConnected()) {
+ if (!handshaker.hasConnection()) {
// Device is in the core, but driver reports there is NOT a
// connection to it. Perhaps the netcfg changed and we didn't
// pick the event?
@@ -603,15 +603,12 @@
case CONNECTION_TEARDOWN:
return () -> handleConnectionTeardown(deviceId);
case CHANNEL_OPEN:
- return () -> handleProbeAvailability(deviceId);
- case CHANNEL_CLOSED:
- return () -> markOfflineIfNeeded(deviceId);
- case PIPELINE_NOT_READY:
- return () -> markOfflineIfNeeded(deviceId);
+ case PROBE_AVAILABILITY:
case PIPELINE_READY:
return () -> handleProbeAvailability(deviceId);
- case PROBE_AVAILABILITY:
- return () -> handleProbeAvailability(deviceId);
+ case CHANNEL_CLOSED:
+ case PIPELINE_NOT_READY:
+ return () -> markOfflineIfNeeded(deviceId);
case ROLE_MASTER:
return () -> handleMastershipResponse(deviceId, MastershipRole.MASTER);
case ROLE_STANDBY:
@@ -631,29 +628,29 @@
bindPipeconfIfRequired(deviceId);
// Get handshaker.
final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
- if (handshaker.isConnected() || handshakersWithListeners.containsKey(deviceId)) {
+ if (handshaker.hasConnection() || handshakersWithListeners.containsKey(deviceId)) {
throw new DeviceTaskException("connection already exists");
}
// Add device agent listener.
- handshaker.addDeviceAgentListener(id(), deviceAgentListener);
handshakersWithListeners.put(deviceId, handshaker);
+ handshaker.addDeviceAgentListener(id(), deviceAgentListener);
// Start connection via handshaker.
- if (!Futures.getUnchecked(handshaker.connect())) {
+ if (!handshaker.connect()) {
// Failed! Remove listeners.
handshaker.removeDeviceAgentListener(id());
handshakersWithListeners.remove(deviceId);
+ // Clean up connection state leftovers.
+ handshaker.disconnect();
throw new DeviceTaskException("connection failed");
}
createOrUpdateDevice(deviceId, false);
- final List<PortDescription> ports = getPortDetails(deviceId);
- providerService.updatePorts(deviceId, ports);
// From here we expect a CHANNEL_OPEN event to update availability.
}
private void handleConnectionUpdate(DeviceId deviceId) {
assertConfig(deviceId);
final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
- if (!handshaker.isConnected()) {
+ if (!handshaker.hasConnection()) {
// If driver reports that a connection still exists, perhaps the
// part of the netcfg that changed does not affect the connection.
// Otherwise, remove any previous connection state from the old
@@ -675,6 +672,11 @@
assertConfig(deviceId);
providerService.deviceConnected(deviceId, getDeviceDescription(
deviceId, available));
+ if (available) {
+ // Push port descriptions.
+ final List<PortDescription> ports = getPortDetails(deviceId);
+ providerService.updatePorts(deviceId, ports);
+ }
}
private boolean probeAvailability(DeviceHandshaker handshaker) {
@@ -697,7 +699,7 @@
private void handleProbeAvailability(DeviceId deviceId) {
assertDeviceRegistered(deviceId);
- // Make device has a valid mastership role.
+ // Make sure device has a valid mastership role.
final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
final MastershipRole deviceRole = handshaker.getRole();
final MastershipRole expectedRole = mastershipService.getLocalRole(deviceId);
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
index aabd92e..b7804bb 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
@@ -103,7 +103,7 @@
}
private void checkSubscription(DeviceId deviceId) {
- if (gnmiController.getClient(deviceId) == null) {
+ if (gnmiController.get(deviceId) == null) {
// Ignore devices for which a gNMI client does not exist.
return;
}
@@ -137,7 +137,7 @@
}
private void unsubscribeIfNeeded(DeviceId deviceId) {
- gnmiController.getClient(deviceId).unsubscribe();
+ gnmiController.get(deviceId).unsubscribe();
if (deviceSubscribed.remove(deviceId) != null) {
log.info("Cancelled gNMI subscription for {}", deviceId);
}
@@ -167,7 +167,7 @@
.setMode(SubscriptionMode.ON_CHANGE)
.build()).collect(Collectors.toList()))
.build();
- gnmiController.getClient(deviceId).subscribe(
+ gnmiController.get(deviceId).subscribe(
SubscribeRequest.newBuilder()
.setSubscribe(subscriptionList)
.build());