Add support for enabling/disabling ports for gNMI devices
This change also includes:
- Refactoring of gNMI protocol+driver to take advantage of the recent
changes to the gRPC protocol subsystem (e.g. no more locking, start RPC
with timeouts, etc.).
- Fixed Stratum driver to work after GeneralDeviceProvider refactoring
- Updated bmv2.py to generate ChassisConfig for stratum_bmv2
- Fixed portstate command to use the same port name as in the store
Change-Id: I0dad3bc73e4b6d907b5cf6b7b9a2852943226be7
diff --git a/cli/src/main/java/org/onosproject/cli/net/DevicePortStateCommand.java b/cli/src/main/java/org/onosproject/cli/net/DevicePortStateCommand.java
index 543bffb..bdf1c65 100644
--- a/cli/src/main/java/org/onosproject/cli/net/DevicePortStateCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/DevicePortStateCommand.java
@@ -66,9 +66,9 @@
return;
}
if ("enable".equals(portState)) {
- deviceAdminService.changePortState(dev.id(), pnum, true);
+ deviceAdminService.changePortState(dev.id(), p.number(), true);
} else if ("disable".equals(portState)) {
- deviceAdminService.changePortState(dev.id(), pnum, false);
+ deviceAdminService.changePortState(dev.id(), p.number(), false);
} else {
print(" %s", "State must be enable or disable");
}
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 0ffe615..a967c70 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -726,7 +726,8 @@
deviceId,
portDescription);
if (event != null) {
- log.info("Device {} port {} status changed", deviceId, event.port().number());
+ log.info("Device {} port {} status changed (enabled={})",
+ deviceId, event.port().number(), portDescription.isEnabled());
post(event);
}
}
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 9653091..29deb4f 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -25,6 +25,7 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.BasicDeviceConfig;
+import org.onosproject.net.device.PortStatisticsDiscovery;
import org.onosproject.net.driver.Behaviour;
import org.onosproject.net.driver.DefaultDriver;
import org.onosproject.net.driver.Driver;
@@ -283,6 +284,20 @@
new HashMap<>();
pipeconf.behaviours().forEach(
b -> behaviours.put(b, pipeconf.implementation(b).get()));
+
+ // FIXME: remove this check when stratum_bmv2 will be open source and we
+ // will no longer need to read port counters from the p4 program. Here
+ // we ignore the PortStatisticsDiscovery behaviour from the pipeconf if
+ // the base driver (e.g. stratum with gnmi) already has it. But in
+ // general, we should give higher priority to pipeconf behaviours.
+ if (baseDriver.hasBehaviour(PortStatisticsDiscovery.class)
+ && behaviours.remove(PortStatisticsDiscovery.class) != null) {
+ log.warn("Ignoring {} behaviour from pipeconf {}, but using " +
+ "the one provided by {} driver...",
+ PortStatisticsDiscovery.class.getSimpleName(), pipeconfId,
+ baseDriver.name());
+ }
+
final Driver piPipeconfDriver = new DefaultDriver(
newDriverName, baseDriver.parents(),
baseDriver.manufacturer(), baseDriver.hwVersion(),
diff --git a/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
index de778a0..6dd861d 100644
--- a/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
@@ -35,6 +35,7 @@
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.device.PortStatisticsDiscovery;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
import org.onosproject.net.driver.Behaviour;
import org.onosproject.net.driver.Driver;
@@ -160,7 +161,13 @@
Set<Class<? extends Behaviour>> expectedBehaviours = Sets.newHashSet();
expectedBehaviours.addAll(BASIC_PIPECONF.behaviours());
expectedBehaviours.addAll(baseDriver.behaviours());
- assertEquals("The driver contains wrong behaviours", expectedBehaviours, driver.behaviours());
+
+ // FIXME: remove when stratum_bmv2 will be open source
+ // (see PiPipeconfManager)
+ // expectedBehaviours.remove(PortStatisticsDiscovery.class);
+
+ assertEquals("The driver contains wrong behaviours",
+ expectedBehaviours, driver.behaviours());
}
private class MockNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
@@ -256,7 +263,8 @@
@Override
public Set<Class<? extends Behaviour>> behaviours() {
- return ImmutableSet.of(DeviceDescriptionDiscovery.class);
+ return ImmutableSet.of(DeviceDescriptionDiscovery.class,
+ PortStatisticsDiscovery.class);
}
@Override
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java
index 4ce9f18..3774bfb 100644
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java
@@ -17,7 +17,6 @@
package org.onosproject.drivers.gnmi;
import com.google.common.base.Strings;
-import io.grpc.StatusRuntimeException;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiClientKey;
import org.onosproject.gnmi.api.GnmiController;
@@ -32,20 +31,12 @@
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
/**
* Abstract implementation of a behaviour handler for a gNMI device.
*/
public class AbstractGnmiHandlerBehaviour extends AbstractHandlerBehaviour {
- // Default timeout in seconds for device operations.
- private static final String DEVICE_REQ_TIMEOUT = "deviceRequestTimeout";
- private static final int DEFAULT_DEVICE_REQ_TIMEOUT = 60;
-
protected final Logger log = LoggerFactory.getLogger(getClass());
protected DeviceId deviceId;
protected DeviceService deviceService;
@@ -95,58 +86,4 @@
return null;
}
}
-
- /**
- * Returns the device request timeout driver property, or a default value
- * if the property is not present or cannot be parsed.
- *
- * @return timeout value
- */
- private int getDeviceRequestTimeout() {
- final String timeout = handler().driver()
- .getProperty(DEVICE_REQ_TIMEOUT);
- if (timeout == null) {
- return DEFAULT_DEVICE_REQ_TIMEOUT;
- } else {
- try {
- return Integer.parseInt(timeout);
- } catch (NumberFormatException e) {
- log.error("{} driver property '{}' is not a number, using default value {}",
- DEVICE_REQ_TIMEOUT, timeout, DEFAULT_DEVICE_REQ_TIMEOUT);
- return DEFAULT_DEVICE_REQ_TIMEOUT;
- }
- }
- }
-
- /**
- * Convenience method to get the result of a completable future while
- * setting a timeout and checking for exceptions.
- *
- * @param future completable future
- * @param opDescription operation description to use in log messages. Should
- * be a sentence starting with a verb ending in -ing,
- * e.g. "reading...", "writing...", etc.
- * @param defaultValue value to return if operation fails
- * @param <U> type of returned value
- * @return future result or default value
- */
- <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
- U defaultValue) {
- try {
- return future.get(getDeviceRequestTimeout(), TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.error("Exception while {} on {}", opDescription, deviceId);
- } catch (ExecutionException e) {
- final Throwable cause = e.getCause();
- if (cause instanceof StatusRuntimeException) {
- final StatusRuntimeException grpcError = (StatusRuntimeException) cause;
- log.warn("Error while {} on {}: {}", opDescription, deviceId, grpcError.getMessage());
- } else {
- log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
- }
- } catch (TimeoutException e) {
- log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
- }
- return defaultValue;
- }
}
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiDeviceDescriptionDiscovery.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiDeviceDescriptionDiscovery.java
index e56f2f3..ce134ad 100644
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiDeviceDescriptionDiscovery.java
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiDeviceDescriptionDiscovery.java
@@ -18,11 +18,16 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
import gnmi.Gnmi;
import gnmi.Gnmi.GetRequest;
import gnmi.Gnmi.GetResponse;
+import org.onlab.packet.ChassisId;
+import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
@@ -49,11 +54,24 @@
private static final Logger log = LoggerFactory
.getLogger(OpenConfigGnmiDeviceDescriptionDiscovery.class);
- private static final String LAST_CHANGE = "last-change";
+ private static final String LAST_CHANGE = "last-changed";
+
+ private static final String UNKNOWN = "unknown";
@Override
public DeviceDescription discoverDeviceDetails() {
- return null;
+ return new DefaultDeviceDescription(
+ data().deviceId().uri(),
+ Device.Type.SWITCH,
+ data().driver().manufacturer(),
+ data().driver().hwVersion(),
+ data().driver().swVersion(),
+ UNKNOWN,
+ new ChassisId(),
+ true,
+ DefaultAnnotations.builder()
+ .set(AnnotationKeys.PROTOCOL, "gNMI")
+ .build());
}
@Override
@@ -63,9 +81,7 @@
}
log.debug("Discovering port details on device {}", handler().data().deviceId());
- final GetResponse response = getFutureWithDeadline(
- client.get(buildPortStateRequest()),
- "getting port details", GetResponse.getDefaultInstance());
+ final GetResponse response = Futures.getUnchecked(client.get(buildPortStateRequest()));
final Map<String, DefaultPortDescription.Builder> ports = Maps.newHashMap();
final Map<String, DefaultAnnotations.Builder> annotations = Maps.newHashMap();
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortAdminBehaviour.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortAdminBehaviour.java
new file mode 100644
index 0000000..4f050b0
--- /dev/null
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortAdminBehaviour.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.gnmi;
+
+import gnmi.Gnmi;
+import org.onosproject.gnmi.api.GnmiClient;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.PortAdmin;
+
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+/**
+ * Implementation of PortAdmin for gNMI devices with OpenConfig support.
+ */
+public class OpenConfigGnmiPortAdminBehaviour
+ extends AbstractGnmiHandlerBehaviour
+ implements PortAdmin {
+
+ @Override
+ public CompletableFuture<Boolean> enable(PortNumber number) {
+ doEnable(number, true);
+ // Always returning true is OK assuming this is used only by the
+ // GeneralDeviceProvider, which ignores the return value and instead
+ // waits for a gNMI Update over the Subscribe RPC.
+ return completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> disable(PortNumber number) {
+ doEnable(number, false);
+ return completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEnabled(PortNumber number) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ private void doEnable(PortNumber portNumber, boolean enabled) {
+ if (portNumber.isLogical()) {
+ log.warn("Cannot update port status for logical port {} on {}",
+ portNumber, deviceId);
+ return;
+ }
+ final GnmiClient client = getClientByKey();
+ if (client == null) {
+ log.warn("Cannot update ports on {}, missing gNMI client", deviceId);
+ return;
+ }
+ final Gnmi.Path path = Gnmi.Path.newBuilder()
+ .addElem(Gnmi.PathElem.newBuilder().setName("interfaces").build())
+ .addElem(Gnmi.PathElem.newBuilder().setName("interface")
+ .putKey("name", portNumber.name()).build())
+ .addElem(Gnmi.PathElem.newBuilder().setName("config").build())
+ .addElem(Gnmi.PathElem.newBuilder().setName("enabled").build())
+ .build();
+ final Gnmi.TypedValue value = Gnmi.TypedValue.newBuilder()
+ .setBoolVal(enabled)
+ .build();
+ final Gnmi.SetRequest request = Gnmi.SetRequest.newBuilder()
+ .addUpdate(Gnmi.Update.newBuilder()
+ .setPath(path)
+ .setVal(value)
+ .build())
+ .build();
+
+ // Async submit request and forget about it. In case of errors, the
+ // client will log them. In case of success, we should receive a gNMI
+ // Update over the Subscribe RPC with the new oper status.
+ client.set(request);
+ }
+}
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortStatisticsDiscovery.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortStatisticsDiscovery.java
index a9481a4..a41144e 100644
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortStatisticsDiscovery.java
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiPortStatisticsDiscovery.java
@@ -17,6 +17,7 @@
package org.onosproject.drivers.gnmi;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
import gnmi.Gnmi;
import gnmi.Gnmi.GetRequest;
import gnmi.Gnmi.GetResponse;
@@ -45,7 +46,7 @@
private static final Map<Pair<DeviceId, PortNumber>, Long> PORT_START_TIMES =
Maps.newConcurrentMap();
- private static final String LAST_CHANGE = "last-change";
+ private static final String LAST_CHANGE = "last-changed";
@Override
public Collection<PortStatistics> discoverPortStatistics() {
@@ -67,10 +68,7 @@
ifacePortNumberMapping.put(portName, port.number());
});
- GetResponse getResponse =
- getFutureWithDeadline(client.get(getRequest.build()),
- "getting port counters",
- GetResponse.getDefaultInstance());
+ GetResponse getResponse = Futures.getUnchecked(client.get(getRequest.build()));
Map<String, Long> inPkts = Maps.newHashMap();
Map<String, Long> outPkts = Maps.newHashMap();
diff --git a/drivers/gnmi/src/main/resources/gnmi-drivers.xml b/drivers/gnmi/src/main/resources/gnmi-drivers.xml
index 9af162f..0928284 100644
--- a/drivers/gnmi/src/main/resources/gnmi-drivers.xml
+++ b/drivers/gnmi/src/main/resources/gnmi-drivers.xml
@@ -22,6 +22,8 @@
impl="org.onosproject.drivers.gnmi.GnmiHandshaker"/>
<behaviour api="org.onosproject.net.device.PortStatisticsDiscovery"
impl="org.onosproject.drivers.gnmi.OpenConfigGnmiPortStatisticsDiscovery"/>
+ <behaviour api="org.onosproject.net.behaviour.PortAdmin"
+ impl="org.onosproject.drivers.gnmi.OpenConfigGnmiPortAdminBehaviour"/>
</driver>
</drivers>
diff --git a/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/AbstractStratumBehaviour.java b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/AbstractStratumBehaviour.java
new file mode 100644
index 0000000..b7c56e9
--- /dev/null
+++ b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/AbstractStratumBehaviour.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.stratum;
+
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.driver.DriverData;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.HandlerBehaviour;
+
+/**
+ * Abstract implementation of a driver behaviour for Stratum devices that
+ * provides access to protocol-specific implementations of the same behavior.
+ *
+ * @param <B> type of behaviour
+ */
+public abstract class AbstractStratumBehaviour<B extends HandlerBehaviour>
+ extends AbstractHandlerBehaviour {
+
+ protected B p4runtime;
+ protected B gnmi;
+
+ public AbstractStratumBehaviour(B p4runtime, B gnmi) {
+ this.p4runtime = p4runtime;
+ this.gnmi = gnmi;
+ }
+
+ @Override
+ public void setHandler(DriverHandler handler) {
+ super.setHandler(handler);
+ p4runtime.setHandler(handler);
+ gnmi.setHandler(handler);
+ }
+
+ @Override
+ public void setData(DriverData data) {
+ super.setData(data);
+ p4runtime.setData(data);
+ gnmi.setData(data);
+ }
+}
diff --git a/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumDeviceDescriptionDiscovery.java b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumDeviceDescriptionDiscovery.java
new file mode 100644
index 0000000..4a65e26
--- /dev/null
+++ b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumDeviceDescriptionDiscovery.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.stratum;
+
+import org.onosproject.drivers.gnmi.OpenConfigGnmiDeviceDescriptionDiscovery;
+import org.onosproject.drivers.p4runtime.P4RuntimeDeviceDescriptionDiscovery;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
+import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceDescriptionDiscovery;
+import org.onosproject.net.device.PortDescription;
+
+import java.util.List;
+
+import static java.lang.String.format;
+
+/**
+ * Implementation of DeviceDescriptionDiscovery for Stratum devices.
+ */
+public class StratumDeviceDescriptionDiscovery
+ extends AbstractStratumBehaviour<DeviceDescriptionDiscovery>
+ implements DeviceDescriptionDiscovery {
+
+ private static final String UNKNOWN = "unknown";
+
+
+ public StratumDeviceDescriptionDiscovery() {
+ super(new P4RuntimeDeviceDescriptionDiscovery(),
+ new OpenConfigGnmiDeviceDescriptionDiscovery());
+ }
+
+ @Override
+ public DeviceDescription discoverDeviceDetails() {
+ final DeviceDescription p4Descr = p4runtime.discoverDeviceDetails();
+ final DeviceDescription gnmiDescr = gnmi.discoverDeviceDetails();
+ return new DefaultDeviceDescription(
+ data().deviceId().uri(),
+ Device.Type.SWITCH,
+ data().driver().manufacturer(),
+ data().driver().hwVersion(),
+ data().driver().swVersion(),
+ UNKNOWN,
+ p4Descr.chassisId(),
+ // Availability is mandated by P4Runtime.
+ p4Descr.isDefaultAvailable(),
+ DefaultAnnotations.builder()
+ .set(AnnotationKeys.PROTOCOL, format(
+ "%s, %s",
+ p4Descr.annotations().value(AnnotationKeys.PROTOCOL),
+ gnmiDescr.annotations().value(AnnotationKeys.PROTOCOL)))
+ .build());
+ }
+
+ @Override
+ public List<PortDescription> discoverPortDetails() {
+ return gnmi.discoverPortDetails();
+ }
+}
diff --git a/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java
index 8762af0..8ce68d7 100644
--- a/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java
+++ b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java
@@ -21,9 +21,6 @@
import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.device.DeviceHandshaker;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.onosproject.net.driver.DriverData;
-import org.onosproject.net.driver.DriverHandler;
import org.onosproject.net.provider.ProviderId;
import java.util.concurrent.CompletableFuture;
@@ -31,33 +28,20 @@
/**
* Implementation of DeviceHandshaker for Stratum device.
*/
-public class StratumHandshaker extends AbstractHandlerBehaviour implements DeviceHandshaker {
-
- private P4RuntimeHandshaker p4runtime;
- private GnmiHandshaker gnmi;
+public class StratumHandshaker
+ extends AbstractStratumBehaviour<DeviceHandshaker>
+ implements DeviceHandshaker {
public StratumHandshaker() {
- p4runtime = new P4RuntimeHandshaker();
- gnmi = new GnmiHandshaker();
- }
-
- @Override
- public void setHandler(DriverHandler handler) {
- super.setHandler(handler);
- p4runtime.setHandler(handler);
- gnmi.setHandler(handler);
- }
-
- @Override
- public void setData(DriverData data) {
- super.setData(data);
- p4runtime.setData(data);
- gnmi.setData(data);
+ super(new P4RuntimeHandshaker(), new GnmiHandshaker());
}
@Override
public boolean isReachable() {
- return p4runtime.isReachable() && gnmi.isReachable();
+ // Reachability is mainly used for mastership contests and it's a
+ // prerequisite for availability. We can probably live without gNMI and
+ // gNOI, but we will always need P4Runtime.
+ return p4runtime.isReachable();
}
@Override
diff --git a/drivers/stratum/src/main/resources/stratum-drivers.xml b/drivers/stratum/src/main/resources/stratum-drivers.xml
index fa0c0c4..f854d5f 100644
--- a/drivers/stratum/src/main/resources/stratum-drivers.xml
+++ b/drivers/stratum/src/main/resources/stratum-drivers.xml
@@ -19,6 +19,8 @@
hwVersion="master" swVersion="Stratum" extends="p4runtime,gnmi">
<behaviour api="org.onosproject.net.device.DeviceHandshaker"
impl="org.onosproject.drivers.stratum.StratumHandshaker"/>
+ <behaviour api="org.onosproject.net.device.DeviceDescriptionDiscovery"
+ impl="org.onosproject.drivers.stratum.StratumDeviceDescriptionDiscovery"/>
</driver>
<driver name="stratum-dummy" manufacturer="Open Networking Foundation"
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
index 796ef55..5d14bbc 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
@@ -38,7 +38,7 @@
*
* @return the capability response
*/
- CompletableFuture<CapabilityResponse> capability();
+ CompletableFuture<CapabilityResponse> capabilities();
/**
* Retrieves a snapshot of data from the device.
@@ -57,15 +57,17 @@
CompletableFuture<SetResponse> set(SetRequest request);
/**
- * Subscribes to a given specific gNMI path.
+ * Starts a subscription for the given request. Updates will be notified by
+ * the controller via {@link GnmiEvent.Type#UPDATE} events. The client
+ * guarantees that a Subscription RPC is active at all times despite channel
+ * or server failures, unless {@link #unsubscribe()} is called.
*
* @param request the subscribe request
- * @return true if subscribe successfully; false otherwise
*/
- boolean subscribe(SubscribeRequest request);
+ void subscribe(SubscribeRequest request);
/**
- * Terminates the subscription channel of this device.
+ * Terminates any Subscribe RPC active.
*/
- void terminateSubscriptionChannel();
+ void unsubscribe();
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index 6645f39..8d93a63 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -28,107 +28,157 @@
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiClientKey;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
-import org.slf4j.Logger;
import java.util.concurrent.CompletableFuture;
-
-import static org.slf4j.LoggerFactory.getLogger;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
/**
* Implementation of gNMI client.
*/
public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
- private static final PathElem DUMMY_PATH_ELEM = PathElem.newBuilder().setName("onos-gnmi-test").build();
- private static final Path DUMMY_PATH = Path.newBuilder().addElem(DUMMY_PATH_ELEM).build();
- private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
- private final Logger log = getLogger(getClass());
- private final gNMIGrpc.gNMIBlockingStub blockingStub;
- private GnmiSubscriptionManager gnmiSubscriptionManager;
+
+ private static final int RPC_TIMEOUT_SECONDS = 10;
+
+ private static final GetRequest PING_REQUEST = GetRequest.newBuilder().addPath(
+ Path.newBuilder().addElem(
+ PathElem.newBuilder().setName("onos-gnmi-ping").build()
+ ).build()).build();
+
+ private GnmiSubscriptionManager subscribeManager;
GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
super(clientKey, managedChannel, false, controller);
- this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
- this.gnmiSubscriptionManager =
- new GnmiSubscriptionManager(managedChannel, deviceId, controller);
+ this.subscribeManager =
+ new GnmiSubscriptionManager(this, deviceId, controller);
}
@Override
- public CompletableFuture<CapabilityResponse> capability() {
- return supplyInContext(this::doCapability, "capability");
+ public CompletableFuture<CapabilityResponse> capabilities() {
+ final CompletableFuture<CapabilityResponse> future = new CompletableFuture<>();
+ execRpc(s -> s.capabilities(
+ CapabilityRequest.getDefaultInstance(),
+ unaryObserver(future, CapabilityResponse.getDefaultInstance(),
+ "capabilities request"))
+ );
+ return future;
}
@Override
public CompletableFuture<GetResponse> get(GetRequest request) {
- return supplyInContext(() -> doGet(request), "get");
+ final CompletableFuture<GetResponse> future = new CompletableFuture<>();
+ execRpc(s -> s.get(request, unaryObserver(
+ future, GetResponse.getDefaultInstance(), "GET"))
+ );
+ return future;
}
@Override
public CompletableFuture<SetResponse> set(SetRequest request) {
- return supplyInContext(() -> doSet(request), "set");
+ final CompletableFuture<SetResponse> future = new CompletableFuture<>();
+ execRpc(s -> s.set(request, unaryObserver(
+ future, SetResponse.getDefaultInstance(), "SET"))
+ );
+ return future;
+ }
+
+ private <RES> StreamObserver<RES> unaryObserver(
+ final CompletableFuture<RES> future,
+ final RES defaultResponse,
+ final String opDescription) {
+ return new StreamObserver<RES>() {
+ @Override
+ public void onNext(RES value) {
+ future.complete(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ handleRpcError(t, opDescription);
+ future.complete(defaultResponse);
+ }
+
+ @Override
+ public void onCompleted() {
+ // Ignore. Unary call.
+ }
+ };
}
@Override
- public boolean subscribe(SubscribeRequest request) {
- return gnmiSubscriptionManager.subscribe(request);
+ public void subscribe(SubscribeRequest request) {
+ subscribeManager.subscribe(request);
}
@Override
- public void terminateSubscriptionChannel() {
- gnmiSubscriptionManager.complete();
+ public void unsubscribe() {
+ subscribeManager.unsubscribe();
}
@Override
public CompletableFuture<Boolean> probeService() {
- return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
+ final CompletableFuture<Boolean> future = new CompletableFuture<>();
+ final StreamObserver<GetResponse> responseObserver = new StreamObserver<GetResponse>() {
+ @Override
+ public void onNext(GetResponse value) {
+ future.complete(true);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // This gRPC call should throw INVALID_ARGUMENT status exception
+ // since "/onos-gnmi-ping" path does not exists in any config
+ // model For other status code such as UNIMPLEMENT, means the
+ // gNMI service is not available on the device.
+ future.complete(Status.fromThrowable(t).getCode()
+ == Status.Code.INVALID_ARGUMENT);
+ }
+
+ @Override
+ public void onCompleted() {
+ // Ignore. Unary call.
+ }
+ };
+ execRpc(s -> s.get(PING_REQUEST, responseObserver));
+ return future;
}
@Override
- protected Void doShutdown() {
- gnmiSubscriptionManager.shutdown();
- return super.doShutdown();
+ public void shutdown() {
+ subscribeManager.shutdown();
+ super.shutdown();
}
- private CapabilityResponse doCapability() {
- CapabilityRequest request = CapabilityRequest.newBuilder().build();
- try {
- return blockingStub.capabilities(request);
- } catch (StatusRuntimeException e) {
- log.warn("Unable to get capability from {}: {}", deviceId, e.getMessage());
- return CapabilityResponse.getDefaultInstance();
+ /**
+ * Forces execution of an RPC in a cancellable context with a timeout.
+ *
+ * @param stubConsumer P4Runtime stub consumer
+ */
+ private void execRpc(Consumer<gNMIGrpc.gNMIStub> stubConsumer) {
+ if (log.isTraceEnabled()) {
+ log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
+ RPC_TIMEOUT_SECONDS, context().getDeadline());
}
+ runInCancellableContext(() -> stubConsumer.accept(
+ gNMIGrpc.newStub(channel)
+ .withDeadlineAfter(RPC_TIMEOUT_SECONDS, TimeUnit.SECONDS)));
}
- private GetResponse doGet(GetRequest request) {
- try {
- return blockingStub.get(request);
- } catch (StatusRuntimeException e) {
- log.warn("Unable to get data from {}: {}", deviceId, e.getMessage());
- return GetResponse.getDefaultInstance();
+ /**
+ * Forces execution of an RPC in a cancellable context with no timeout.
+ *
+ * @param stubConsumer P4Runtime stub consumer
+ */
+ void execRpcNoTimeout(Consumer<gNMIGrpc.gNMIStub> stubConsumer) {
+ if (log.isTraceEnabled()) {
+ log.trace("Executing RPC with no timeout (context deadline {})...",
+ context().getDeadline());
}
- }
-
- private SetResponse doSet(SetRequest request) {
- try {
- return blockingStub.set(request);
- } catch (StatusRuntimeException e) {
- log.warn("Unable to set data to {}: {}", deviceId, e.getMessage());
- return SetResponse.getDefaultInstance();
- }
- }
-
- private boolean doServiceAvailable() {
- try {
- return blockingStub.get(DUMMY_REQUEST) != null;
- } catch (StatusRuntimeException e) {
- // This gRPC call should throw INVALID_ARGUMENT status exception
- // since "/onos-gnmi-test" path does not exists in any config model
- // For other status code such as UNIMPLEMENT, means the gNMI
- // service is not available on the device.
- return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
- }
+ runInCancellableContext(() -> stubConsumer.accept(
+ gNMIGrpc.newStub(channel)));
}
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
index 156cb43..73bb807 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
@@ -17,9 +17,8 @@
package org.onosproject.gnmi.ctl;
+import com.google.common.util.concurrent.Futures;
import gnmi.Gnmi;
-import gnmi.gNMIGrpc;
-import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
@@ -29,9 +28,10 @@
import org.slf4j.Logger;
import java.net.ConnectException;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -39,108 +39,110 @@
import static org.slf4j.LoggerFactory.getLogger;
/**
- * A manager for the gNMI stream channel that opportunistically creates
- * new stream RCP stubs (e.g. when one fails because of errors) and posts
- * subscribe events via the gNMI controller.
+ * A manager for the gNMI Subscribe RPC that opportunistically starts new RPC
+ * (e.g. when one fails because of errors) and posts subscribe events via the
+ * gNMI controller.
*/
final class GnmiSubscriptionManager {
- /**
- * The state of the subscription manager.
- */
- enum State {
-
- /**
- * Subscription not exists.
- */
- INIT,
-
- /**
- * Exists a subscription and channel opened.
- */
- SUBSCRIBED,
-
- /**
- * Exists a subscription, but the channel does not open.
- */
- RETRYING,
- }
-
// FIXME: make this configurable
private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds
+
private static final Logger log = getLogger(GnmiSubscriptionManager.class);
- private final ManagedChannel channel;
+
+ private final GnmiClientImpl client;
private final DeviceId deviceId;
private final GnmiControllerImpl controller;
-
private final StreamObserver<Gnmi.SubscribeResponse> responseObserver;
- private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
+
+ private final ScheduledExecutorService streamCheckerExecutor =
+ newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-subscribe-check", "%d", log));
+ private Future<?> checkTask;
private ClientCallStreamObserver<Gnmi.SubscribeRequest> requestObserver;
private Gnmi.SubscribeRequest existingSubscription;
- private final ScheduledExecutorService streamCheckerExecutor =
- newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-probe", "%d", log));
+ private AtomicBoolean active = new AtomicBoolean(false);
- GnmiSubscriptionManager(ManagedChannel channel, DeviceId deviceId,
+ GnmiSubscriptionManager(GnmiClientImpl client, DeviceId deviceId,
GnmiControllerImpl controller) {
- this.channel = channel;
+ this.client = client;
this.deviceId = deviceId;
this.controller = controller;
this.responseObserver = new InternalStreamResponseObserver();
- streamCheckerExecutor.scheduleAtFixedRate(this::checkGnmiStream, 0,
- DEFAULT_RECONNECT_DELAY,
- TimeUnit.SECONDS);
+ }
+
+ void subscribe(Gnmi.SubscribeRequest request) {
+ synchronized (this) {
+ if (existingSubscription != null) {
+ if (existingSubscription.equals(request)) {
+ // Nothing to do. We are already subscribed for the same
+ // request.
+ log.debug("Ignoring re-subscription to same request",
+ deviceId);
+ return;
+ }
+ log.debug("Cancelling existing subscription for {} before " +
+ "starting a new one", deviceId);
+ complete();
+ }
+ existingSubscription = request;
+ sendSubscribeRequest();
+ if (checkTask != null) {
+ checkTask = streamCheckerExecutor.scheduleAtFixedRate(
+ this::checkSubscription, 0,
+ DEFAULT_RECONNECT_DELAY,
+ TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ void unsubscribe() {
+ synchronized (this) {
+ if (checkTask != null) {
+ checkTask.cancel(false);
+ checkTask = null;
+ }
+ existingSubscription = null;
+ complete();
+ }
}
public void shutdown() {
- log.info("gNMI subscription manager for device {} shutdown", deviceId);
- streamCheckerExecutor.shutdown();
- complete();
+ log.debug("Shutting down gNMI subscription manager for {}", deviceId);
+ unsubscribe();
+ streamCheckerExecutor.shutdownNow();
}
- private void initIfRequired() {
+ private void checkSubscription() {
+ synchronized (this) {
+ if (existingSubscription != null && !active.get()) {
+ if (client.isServerReachable() || Futures.getUnchecked(client.probeService())) {
+ log.info("Re-starting Subscribe RPC for {}...", deviceId);
+ sendSubscribeRequest();
+ } else {
+ log.debug("Not restarting Subscribe RPC for {}, server is NOT reachable",
+ deviceId);
+ }
+ }
+ }
+ }
+
+ private void sendSubscribeRequest() {
if (requestObserver == null) {
- log.debug("Creating new stream channel for {}...", deviceId);
- requestObserver = (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
- gNMIGrpc.newStub(channel).subscribe(responseObserver);
-
+ log.debug("Starting new Subscribe RPC for {}...", deviceId);
+ client.execRpcNoTimeout(
+ s -> requestObserver =
+ (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
+ s.subscribe(responseObserver)
+ );
}
- }
-
- boolean subscribe(Gnmi.SubscribeRequest request) {
- synchronized (state) {
- if (state.get() == State.SUBSCRIBED) {
- // Cancel subscription when we need to subscribe new thing
- complete();
- }
-
- existingSubscription = request;
- return send(request);
- }
- }
-
- private boolean send(Gnmi.SubscribeRequest value) {
- initIfRequired();
- try {
- requestObserver.onNext(value);
- state.set(State.SUBSCRIBED);
- return true;
- } catch (Throwable ex) {
- if (ex instanceof StatusRuntimeException) {
- log.warn("Unable to send subscribe request to {}: {}",
- deviceId, ex.getMessage());
- } else {
- log.warn("Exception while sending subscribe request to {}",
- deviceId, ex);
- }
- state.set(State.RETRYING);
- return false;
- }
+ requestObserver.onNext(existingSubscription);
+ active.set(true);
}
public void complete() {
- synchronized (state) {
- state.set(State.INIT);
+ synchronized (this) {
+ active.set(false);
if (requestObserver != null) {
requestObserver.onCompleted();
requestObserver.cancel("Terminated", null);
@@ -149,21 +151,8 @@
}
}
- private void checkGnmiStream() {
- synchronized (state) {
- if (state.get() != State.RETRYING) {
- // No need to retry if the state is not RETRYING
- return;
- }
- log.info("Try reconnecting gNMI stream to device {}", deviceId);
-
- complete();
- send(existingSubscription);
- }
- }
-
/**
- * Handles messages received from the device on the stream channel.
+ * Handles messages received from the device on the Subscribe RPC.
*/
private final class InternalStreamResponseObserver
implements StreamObserver<Gnmi.SubscribeResponse> {
@@ -171,41 +160,51 @@
@Override
public void onNext(Gnmi.SubscribeResponse message) {
try {
- log.debug("Received message on stream channel from {}: {}",
- deviceId, message.toString());
- GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
- GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
- controller.postEvent(event);
+ if (log.isTraceEnabled()) {
+ log.trace("Received SubscribeResponse from {}: {}",
+ deviceId, message.toString());
+ }
+ controller.postEvent(new GnmiEvent(GnmiEvent.Type.UPDATE, new GnmiUpdate(
+ deviceId, message.getUpdate(), message.getSyncResponse())));
} catch (Throwable ex) {
- log.error("Exception while processing stream message from {}",
- deviceId, ex);
+ log.error("Exception processing SubscribeResponse from " + deviceId,
+ ex);
}
}
@Override
public void onError(Throwable throwable) {
+ complete();
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) throwable;
if (sre.getStatus().getCause() instanceof ConnectException) {
- log.warn("Device {} is unreachable ({})",
- deviceId, sre.getCause().getMessage());
+ log.warn("{} is unreachable ({})",
+ deviceId, sre.getCause().getMessage());
} else {
- log.warn("Received error on stream channel for {}: {}",
- deviceId, throwable.getMessage());
+ log.warn("Error on Subscribe RPC for {}: {}",
+ deviceId, throwable.getMessage());
}
} else {
- log.warn(format("Received exception on stream channel for %s",
- deviceId), throwable);
+ log.error(format("Exception on Subscribe RPC for %s",
+ deviceId), throwable);
}
- state.set(State.RETRYING);
}
@Override
public void onCompleted() {
- log.warn("Stream channel for {} has completed", deviceId);
- state.set(State.RETRYING);
+ complete();
+ log.warn("Subscribe RPC for {} has completed", deviceId);
}
}
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (!streamCheckerExecutor.isShutdown()) {
+ log.error("Finalizing object but executor is still active! BUG? Shutting down...");
+ shutdown();
+ }
+ super.finalize();
+ }
}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
index d529529..7957006 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
@@ -28,11 +28,8 @@
/**
* Shutdowns the client by terminating any active RPC.
- *
- * @return a completable future to signal the completion of the shutdown
- * procedure
*/
- CompletableFuture<Void> shutdown();
+ void shutdown();
/**
* This method provides a coarse modelling of gRPC channel {@link
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index 117c6e3..befa334 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -26,18 +26,10 @@
import org.onosproject.net.device.DeviceAgentEvent;
import org.slf4j.Logger;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onlab.util.Tools.groupedThreads;
+import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -45,18 +37,11 @@
*/
public abstract class AbstractGrpcClient implements GrpcClient {
- // Timeout in seconds to obtain the request lock.
- private static final int LOCK_TIMEOUT = 60;
- private static final int DEFAULT_THREAD_POOL_SIZE = 10;
-
protected final Logger log = getLogger(getClass());
- private final Lock requestLock = new ReentrantLock();
private final Context.CancellableContext cancellableContext =
Context.current().withCancellation();
- private final Executor contextExecutor;
- protected final ExecutorService executorService;
protected final DeviceId deviceId;
protected final ManagedChannel channel;
private final boolean persistent;
@@ -81,9 +66,6 @@
this.channel = channel;
this.persistent = persistent;
this.controller = controller;
- this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
- "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
- this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
}
@@ -106,27 +88,15 @@
}
@Override
- public CompletableFuture<Void> shutdown() {
+ public void shutdown() {
if (cancellableContext.isCancelled()) {
log.warn("Context is already cancelled, " +
"ignoring request to shutdown for {}...", deviceId);
- return CompletableFuture.completedFuture(null);
+ return;
}
- return CompletableFuture.supplyAsync(this::doShutdown);
- }
-
- protected Void doShutdown() {
log.warn("Shutting down client for {}...", deviceId);
cancellableContext.cancel(new InterruptedException(
"Requested client shutdown"));
- this.executorService.shutdownNow();
- try {
- executorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Executor service didn't shutdown in time.");
- Thread.currentThread().interrupt();
- }
- return null;
}
/**
@@ -152,66 +122,22 @@
return cancellableContext;
}
- /**
- * Equivalent of supplyWithExecutor using the gRPC context executor of this
- * client, such that if the context is cancelled (e.g. client shutdown) the
- * RPC is automatically cancelled.
- *
- * @param <U> return type of supplier
- * @param supplier the supplier to be executed
- * @param opDescription the description of this supplier
- * @return CompletableFuture includes the result of supplier
- * @throws IllegalStateException if client has been shut down
- */
- protected <U> CompletableFuture<U> supplyInContext(
- Supplier<U> supplier, String opDescription) {
- return supplyWithExecutor(supplier, opDescription, contextExecutor);
- }
-
- /**
- * Submits a task for async execution via the given executor. All tasks
- * submitted with this method will be executed sequentially.
- *
- * @param <U> return type of supplier
- * @param supplier the supplier to be executed
- * @param opDescription the description of this supplier
- * @param executor the executor to execute this supplier
- * @return CompletableFuture includes the result of supplier
- * @throws IllegalStateException if client has been shut down
- */
- private <U> CompletableFuture<U> supplyWithExecutor(
- Supplier<U> supplier, String opDescription, Executor executor) {
- if (this.cancellableContext.isCancelled()) {
- throw new IllegalStateException("Client has been shut down");
+ protected void handleRpcError(Throwable throwable, String opDescription) {
+ if (throwable instanceof StatusRuntimeException) {
+ final StatusRuntimeException sre = (StatusRuntimeException) throwable;
+ final String logMsg;
+ if (sre.getCause() == null) {
+ logMsg = sre.getMessage();
+ } else {
+ logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
+ }
+ log.warn("Error while performing {} on {}: {}",
+ opDescription, deviceId, logMsg);
+ log.debug("", throwable);
+ return;
}
- return CompletableFuture.supplyAsync(() -> {
- // TODO: explore a more relaxed locking strategy.
- try {
- if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
- log.error("LOCK TIMEOUT! This is likely a deadlock, "
- + "please debug (executing {})",
- opDescription);
- throw new IllegalThreadStateException("Lock timeout");
- }
- } catch (InterruptedException e) {
- log.warn("Thread interrupted while waiting for lock (executing {})",
- opDescription);
- throw new IllegalStateException(e);
- }
- try {
- return supplier.get();
- } catch (StatusRuntimeException ex) {
- log.warn("Unable to execute {} on {}: {}",
- opDescription, deviceId, ex.toString());
- throw ex;
- } catch (Throwable ex) {
- log.error("Exception in client of {}, executing {}",
- deviceId, opDescription, ex);
- throw ex;
- } finally {
- requestLock.unlock();
- }
- }, executor);
+ log.error(format("Exception while performing %s on %s",
+ opDescription, deviceId), throwable);
}
private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
index d06e3fc..d54ef97 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
@@ -42,7 +42,6 @@
import java.util.function.Consumer;
import static com.google.common.base.Preconditions.checkNotNull;
-import static java.lang.String.format;
import static p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest.ResponseType.COOKIE_ONLY;
/**
@@ -96,9 +95,9 @@
}
@Override
- protected Void doShutdown() {
+ public void shutdown() {
streamClient.closeSession();
- return super.doShutdown();
+ super.shutdown();
}
@Override
@@ -191,6 +190,22 @@
return future;
}
+ @Override
+ protected void handleRpcError(Throwable throwable, String opDescription) {
+ if (throwable instanceof StatusRuntimeException) {
+ checkGrpcException((StatusRuntimeException) throwable);
+ }
+ super.handleRpcError(throwable, opDescription);
+ }
+
+ private void checkGrpcException(StatusRuntimeException sre) {
+ if (sre.getStatus().getCode() == Status.Code.PERMISSION_DENIED) {
+ // Notify upper layers that this node is not master.
+ controller.postEvent(new DeviceAgentEvent(
+ DeviceAgentEvent.Type.NOT_MASTER, deviceId));
+ }
+ }
+
/**
* Returns the P4Runtime-internal device ID associated with this client.
*
@@ -252,38 +267,4 @@
runInCancellableContext(() -> stubConsumer.accept(
P4RuntimeGrpc.newStub(channel)));
}
-
- /**
- * Logs the error and checks it for any condition that might be of interest
- * for the controller.
- *
- * @param throwable throwable
- * @param opDescription operation description for logging
- */
- void handleRpcError(Throwable throwable, String opDescription) {
- if (throwable instanceof StatusRuntimeException) {
- final StatusRuntimeException sre = (StatusRuntimeException) throwable;
- checkGrpcException(sre);
- final String logMsg;
- if (sre.getCause() == null) {
- logMsg = sre.getMessage();
- } else {
- logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
- }
- log.warn("Error while performing {} on {}: {}",
- opDescription, deviceId, logMsg);
- log.debug("", throwable);
- return;
- }
- log.error(format("Exception while performing %s on %s",
- opDescription, deviceId), throwable);
- }
-
- private void checkGrpcException(StatusRuntimeException sre) {
- if (sre.getStatus().getCode() == Status.Code.PERMISSION_DENIED) {
- // Notify upper layers that this node is not master.
- controller.postEvent(new DeviceAgentEvent(
- DeviceAgentEvent.Type.NOT_MASTER, deviceId));
- }
- }
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
index 1bf195c..df4844b 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -303,25 +303,10 @@
void send(StreamMessageRequest value) {
synchronized (this) {
initIfRequired();
- doSend(value);
- }
- }
-
- private void doSend(StreamMessageRequest value) {
- try {
requestObserver.onNext(value);
// Optimistically set the session as open. In case of errors, it
// will be closed by the response stream observer.
streamChannelManager.signalOpen();
- } catch (Throwable ex) {
- if (ex instanceof StatusRuntimeException) {
- log.warn("Unable to send {} to {}: {}",
- value.getUpdateCase().toString(), deviceId, ex.getMessage());
- } else {
- log.error("Exception while sending {} to {}: {}",
- value.getUpdateCase().toString(), deviceId, ex);
- }
- teardown();
}
}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
index 6e164bf..a527642 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
@@ -17,7 +17,7 @@
package org.onosproject.provider.general.device.impl;
import com.google.common.annotations.Beta;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Striped;
import gnmi.Gnmi.Notification;
import gnmi.Gnmi.Path;
@@ -27,7 +27,6 @@
import gnmi.Gnmi.SubscriptionList;
import gnmi.Gnmi.SubscriptionMode;
import gnmi.Gnmi.Update;
-import org.onlab.util.SharedExecutors;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiController;
import org.onosproject.gnmi.api.GnmiEvent;
@@ -40,6 +39,7 @@
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
@@ -49,10 +49,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
/**
* Entity that manages gNMI subscription for devices using OpenConfig models and
@@ -61,7 +63,7 @@
@Beta
class GnmiDeviceStateSubscriber {
- private static final String LAST_CHANGE = "last-change";
+ private static final String LAST_CHANGE = "last-changed";
private static Logger log = LoggerFactory.getLogger(GnmiDeviceStateSubscriber.class);
@@ -70,12 +72,10 @@
private final DeviceProviderService providerService;
private final MastershipService mastershipService;
- private final ExecutorService executorService = SharedExecutors.getPoolThreadExecutor();
-
private final InternalGnmiEventListener gnmiEventListener = new InternalGnmiEventListener();
private final InternalDeviceListener deviceEventListener = new InternalDeviceListener();
private final InternalMastershipListener mastershipListener = new InternalMastershipListener();
- private final Collection<DeviceId> deviceSubscribed = Sets.newHashSet();
+ private final Map<DeviceId, Set<PortNumber>> deviceSubscribed = Maps.newHashMap();
private final Striped<Lock> deviceLocks = Striped.lock(30);
@@ -93,34 +93,40 @@
mastershipService.addListener(mastershipListener);
gnmiController.addListener(gnmiEventListener);
// Subscribe to existing devices.
- deviceService.getDevices().forEach(d -> executorService.execute(
- () -> checkDeviceSubscription(d.id())));
+ deviceService.getDevices().forEach(d -> checkSubscription(d.id()));
}
public void deactivate() {
- deviceSubscribed.forEach(this::unsubscribeIfNeeded);
+ deviceSubscribed.keySet().forEach(this::unsubscribeIfNeeded);
deviceService.removeListener(deviceEventListener);
mastershipService.removeListener(mastershipListener);
gnmiController.removeListener(gnmiEventListener);
}
- private void checkDeviceSubscription(DeviceId deviceId) {
+ private void checkSubscription(DeviceId deviceId) {
+ if (gnmiController.getClient(deviceId) == null) {
+ // Ignore devices for which a gNMI client does not exist.
+ return;
+ }
deviceLocks.get(deviceId).lock();
try {
- if (!deviceService.isAvailable(deviceId)
- || deviceService.getDevice(deviceId) == null
- || !mastershipService.isLocalMaster(deviceId)) {
- // Device not available/removed or this instance is no longer
- // master.
- unsubscribeIfNeeded(deviceId);
- } else {
+ if (shouldHaveSubscription(deviceId)) {
subscribeIfNeeded(deviceId);
+ } else {
+ unsubscribeIfNeeded(deviceId);
}
} finally {
deviceLocks.get(deviceId).unlock();
}
}
+ private boolean shouldHaveSubscription(DeviceId deviceId) {
+ return deviceService.getDevice(deviceId) != null
+ && deviceService.isAvailable(deviceId)
+ && mastershipService.isLocalMaster(deviceId)
+ && !deviceService.getPorts(deviceId).isEmpty();
+ }
+
private Path interfaceOperStatusPath(String interfaceName) {
return Path.newBuilder()
.addElem(PathElem.newBuilder().setName("interfaces").build())
@@ -132,40 +138,31 @@
}
private void unsubscribeIfNeeded(DeviceId deviceId) {
- if (!deviceSubscribed.contains(deviceId)) {
- // Not subscribed.
- return;
+ gnmiController.getClient(deviceId).unsubscribe();
+ if (deviceSubscribed.remove(deviceId) != null) {
+ log.info("Cancelled gNMI subscription for {}", deviceId);
}
- GnmiClient client = gnmiController.getClient(deviceId);
- if (client == null) {
- log.debug("Cannot find gNMI client for device {}", deviceId);
- } else {
- client.terminateSubscriptionChannel();
- }
- deviceSubscribed.remove(deviceId);
}
private void subscribeIfNeeded(DeviceId deviceId) {
- if (deviceSubscribed.contains(deviceId)) {
- // Already subscribed.
- // FIXME: if a new port is added after the first subscription we are
- // not subscribing to the new port.
+
+ Set<PortNumber> ports = deviceService.getPorts(deviceId).stream()
+ .map(Port::number)
+ .collect(Collectors.toSet());
+
+ if (Objects.equals(ports, deviceSubscribed.get(deviceId))) {
+ // Already subscribed for the same ports.
return;
}
GnmiClient client = gnmiController.getClient(deviceId);
- if (client == null) {
- log.debug("Cannot find gNMI client for device {}, ignoring.", deviceId);
- return;
- }
- List<Port> ports = deviceService.getPorts(deviceId);
SubscriptionList.Builder subscriptionList = SubscriptionList.newBuilder();
subscriptionList.setMode(SubscriptionList.Mode.STREAM);
subscriptionList.setUpdatesOnly(true);
ports.forEach(port -> {
- String portName = port.number().name();
+ String portName = port.name();
// Subscribe /interface/interface[name=port-name]/state/oper-status
Path subscribePath = interfaceOperStatusPath(portName);
Subscription interfaceOperStatusSub =
@@ -183,7 +180,9 @@
client.subscribe(subscribeRequest);
- deviceSubscribed.add(deviceId);
+ log.info("Started gNMI subscription for {} ports on {}", ports.size(), deviceId);
+
+ deviceSubscribed.put(deviceId, ports);
}
private void handleGnmiUpdate(GnmiUpdate eventSubject) {
@@ -247,14 +246,16 @@
@Override
public void event(GnmiEvent event) {
- if (!deviceSubscribed.contains(event.subject().deviceId())) {
- log.warn("Received gNMI event from {}, but we are not subscribed to it",
+ if (!deviceSubscribed.containsKey(event.subject().deviceId())) {
+ log.warn("Received gNMI event from {}, but we did'nt expect to " +
+ "be subscribed to it! Discarding event...",
event.subject().deviceId());
+ return;
}
+
log.debug("Received gNMI event {}", event.toString());
if (event.type() == GnmiEvent.Type.UPDATE) {
- executorService.execute(
- () -> handleGnmiUpdate((GnmiUpdate) event.subject()));
+ handleGnmiUpdate((GnmiUpdate) event.subject());
} else {
log.debug("Unsupported gNMI event type: {}", event.type());
}
@@ -265,7 +266,7 @@
@Override
public void event(MastershipEvent event) {
- executorService.execute(() -> checkDeviceSubscription(event.subject()));
+ checkSubscription(event.subject());
}
}
@@ -278,8 +279,9 @@
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_UPDATED:
case DEVICE_REMOVED:
- executorService.execute(
- () -> checkDeviceSubscription(event.subject().id()));
+ case PORT_ADDED:
+ case PORT_REMOVED:
+ checkSubscription(event.subject().id());
break;
default:
break;
diff --git a/tools/dev/mininet/bmv2.py b/tools/dev/mininet/bmv2.py
index 3dc42fd..522a80c 100644
--- a/tools/dev/mininet/bmv2.py
+++ b/tools/dev/mininet/bmv2.py
@@ -143,6 +143,7 @@
self.dryrun = parseBoolean(dryrun)
self.valgrind = parseBoolean(valgrind)
self.netcfgfile = '/tmp/bmv2-%s-netcfg.json' % self.name
+ self.chassisConfigFile = '/tmp/bmv2-%s-chassis-config.txt' % self.name
self.pipeconfId = pipeconf
self.injectPorts = parseBoolean(portcfg)
self.withGnmi = parseBoolean(gnmi)
@@ -152,6 +153,7 @@
self.onosDeviceId = onosdevid
else:
self.onosDeviceId = "device:bmv2:%s" % self.name
+ self.p4DeviceId = BMV2_DEFAULT_DEVICE_ID
self.logfd = None
self.bmv2popen = None
self.stopped = True
@@ -176,8 +178,8 @@
basicCfg = {
"managementAddress": "grpc://%s:%d?device_id=%d" % (
- srcIP, self.grpcPort, BMV2_DEFAULT_DEVICE_ID),
- "driver": "bmv2",
+ srcIP, self.grpcPort, self.p4DeviceId),
+ "driver": "stratum-bmv2" if self.useStratum else "bmv2",
"pipeconf": self.pipeconfId
}
@@ -189,7 +191,7 @@
"basic": basicCfg
}
- if self.injectPorts:
+ if not self.useStratum and self.injectPorts:
portData = {}
portId = 1
for intfName in self.intfNames():
@@ -209,6 +211,40 @@
return cfgData
+ def chassisConfig(self):
+ config = """description: "BMv2 simple_switch {name}"
+chassis {{
+ platform: PLT_P4_SOFT_SWITCH
+ name: "{name}"
+}}
+nodes {{
+ id: {nodeId}
+ name: "{name} node {nodeId}"
+ slot: 1
+ index: 1
+}}\n""".format(name=self.name, nodeId=self.p4DeviceId)
+
+ intfNumber = 1
+ for intfName in self.intfNames():
+ if intfName == 'lo':
+ continue
+ config = config + """singleton_ports {{
+ id: {intfNumber}
+ name: "{intfName}"
+ slot: 1
+ port: {intfNumber}
+ channel: 1
+ speed_bps: 10000000000
+ config_params {{
+ admin_state: ADMIN_STATE_ENABLED
+ }}
+ node: {nodeId}
+}}\n""".format(intfName=intfName, intfNumber=intfNumber,
+ nodeId=self.p4DeviceId)
+ intfNumber += 1
+
+ return config
+
def doOnosNetcfg(self, controllerIP):
"""
Notifies ONOS about the new device via Netcfg.
@@ -265,6 +301,8 @@
if self.useStratum:
config_dir = "/tmp/bmv2-%s-stratum" % self.name
os.mkdir(config_dir)
+ with open(self.chassisConfigFile, 'w') as fp:
+ fp.write(self.chassisConfig())
cmdString = self.getStratumCmdString(config_dir)
else:
if self.thriftPort is None:
@@ -306,7 +344,8 @@
stratumRoot = getStratumRoot()
args = [
stratumRoot + STRATUM_BINARY,
- '-device_id=%d' % BMV2_DEFAULT_DEVICE_ID,
+ '-device_id=%d' % self.p4DeviceId,
+ '-chassis_config_file=%s' % self.chassisConfigFile,
'-forwarding_pipeline_configs_file=%s/config.txt' % config_dir,
'-persistent_config_dir=' + config_dir,
'-initial_pipeline=' + stratumRoot + STRATUM_INIT_PIPELINE,
@@ -314,13 +353,10 @@
'-external_hercules_urls=0.0.0.0:%d' % self.grpcPort,
'-max_num_controllers_per_node=10'
]
- for port, intf in self.intfs.items():
- if not intf.IP():
- args.append('%d@%s' % (port, intf.name))
return " ".join(args)
def bmv2Args(self):
- args = ['--device-id %s' % str(BMV2_DEFAULT_DEVICE_ID)]
+ args = ['--device-id %s' % str(self.p4DeviceId)]
for port, intf in self.intfs.items():
if not intf.IP():
args.append('-i %d@%s' % (port, intf.name))
diff --git a/tools/dev/p4vm/install-p4-tools.sh b/tools/dev/p4vm/install-p4-tools.sh
index 8b52c8b..546c74d 100755
--- a/tools/dev/p4vm/install-p4-tools.sh
+++ b/tools/dev/p4vm/install-p4-tools.sh
@@ -19,7 +19,7 @@
set -e
set -x
-BMV2_COMMIT="d4340832121be1be3852ca0bef709f6443ef86ed"
+BMV2_COMMIT="5d25d0d94681492d155d3e5b72b16a56121f8dfe"
PI_COMMIT="81b7e84bf8c27ce87571f66e5ccc76ce228caa8c"
P4C_COMMIT="5ae390430bd025b301854cd04c78b1ff9902180f"