Merge branch 'master' into dev-karaf-4.2.1
Change-Id: I260f0ee72fa87f1547fa790f031c4980176992ac
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
index edcf6ed..70d75cc 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
@@ -19,7 +19,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.onlab.packet.Ethernet;
-import org.onlab.packet.ICMP;
import org.onlab.packet.IPv4;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
@@ -884,34 +883,6 @@
PRIORITY_EXTERNAL_ROUTING_RULE,
GW_COMMON_TABLE,
install);
-
- // TODO: we do not remove the IcmpReplyMatchRules with false installation flag
- // need to find a better way to remove this rule
- if (install) {
- setIcmpReplyRules(deviceId, install);
- }
- }
-
- private void setIcmpReplyRules(DeviceId deviceId, boolean install) {
- // Sends ICMP response to controller for SNATing ingress traffic
- TrafficSelector selector = DefaultTrafficSelector.builder()
- .matchEthType(Ethernet.TYPE_IPV4)
- .matchIPProtocol(IPv4.PROTOCOL_ICMP)
- .matchIcmpType(ICMP.TYPE_ECHO_REPLY)
- .build();
-
- TrafficTreatment treatment = DefaultTrafficTreatment.builder()
- .punt()
- .build();
-
- osFlowRuleService.setRule(
- appId,
- deviceId,
- selector,
- treatment,
- PRIORITY_INTERNAL_ROUTING_RULE,
- GW_COMMON_TABLE,
- install);
}
private void setRouterAdminRules(String segmentId, NetworkType networkType, boolean install) {
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java
index 1cc02b7..f685224 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java
@@ -29,10 +29,15 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.packet.DefaultOutboundPacket;
import org.onosproject.net.packet.InboundPacket;
@@ -44,9 +49,12 @@
import org.onosproject.openstacknetworking.api.ExternalPeerRouter;
import org.onosproject.openstacknetworking.api.InstancePort;
import org.onosproject.openstacknetworking.api.InstancePortService;
+import org.onosproject.openstacknetworking.api.OpenstackFlowRuleService;
import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
import org.onosproject.openstacknetworking.api.OpenstackRouterService;
import org.onosproject.openstacknode.api.OpenstackNode;
+import org.onosproject.openstacknode.api.OpenstackNodeEvent;
+import org.onosproject.openstacknode.api.OpenstackNodeListener;
import org.onosproject.openstacknode.api.OpenstackNodeService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
@@ -74,7 +82,9 @@
import static org.onlab.packet.ICMP.TYPE_ECHO_REQUEST;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.openstacknetworking.api.Constants.DEFAULT_GATEWAY_MAC;
+import static org.onosproject.openstacknetworking.api.Constants.GW_COMMON_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.OPENSTACK_NETWORKING_APP_ID;
+import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_INTERNAL_ROUTING_RULE;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.GATEWAY;
import static org.slf4j.LoggerFactory.getLogger;
@@ -117,10 +127,20 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackRouterService osRouterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected OpenstackFlowRuleService osFlowRuleService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
private ConsistentMap<String, InstancePort> icmpInfoMap;
+ private final OpenstackNodeListener osNodeListener = new InternalNodeEventListener();
private static final KryoNamespace SERIALIZER_ICMP_MAP = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
@@ -130,11 +150,15 @@
.build();
private ApplicationId appId;
+ private NodeId localNodeId;
@Activate
protected void activate() {
appId = coreService.registerApplication(OPENSTACK_NETWORKING_APP_ID);
packetService.addProcessor(packetProcessor, PacketProcessor.director(1));
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+ osNodeService.addListener(osNodeListener);
icmpInfoMap = storageService.<String, InstancePort>consistentMapBuilder()
.withSerializer(Serializer.using(SERIALIZER_ICMP_MAP))
@@ -149,6 +173,8 @@
protected void deactivate() {
packetService.removeProcessor(packetProcessor);
eventExecutor.shutdown();
+ leadershipService.withdraw(appId.name());
+ osNodeService.removeListener(osNodeListener);
log.info("Stopped");
}
@@ -497,4 +523,50 @@
}
}
}
+
+ private class InternalNodeEventListener implements OpenstackNodeListener {
+ @Override
+ public boolean isRelevant(OpenstackNodeEvent event) {
+ // do not allow to proceed without leadership
+ NodeId leader = leadershipService.getLeader(appId.name());
+ return Objects.equals(localNodeId, leader) && event.subject().type() == GATEWAY;
+ }
+
+ @Override
+ public void event(OpenstackNodeEvent event) {
+ OpenstackNode osNode = event.subject();
+ switch (event.type()) {
+ case OPENSTACK_NODE_COMPLETE:
+ eventExecutor.execute(() -> setIcmpReplyRules(osNode.intgBridge(), true));
+ break;
+ case OPENSTACK_NODE_INCOMPLETE:
+ eventExecutor.execute(() -> setIcmpReplyRules(osNode.intgBridge(), false));
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void setIcmpReplyRules(DeviceId deviceId, boolean install) {
+ // Sends ICMP response to controller for SNATing ingress traffic
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPProtocol(IPv4.PROTOCOL_ICMP)
+ .matchIcmpType(ICMP.TYPE_ECHO_REPLY)
+ .build();
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .punt()
+ .build();
+
+ osFlowRuleService.setRule(
+ appId,
+ deviceId,
+ selector,
+ treatment,
+ PRIORITY_INTERNAL_ROUTING_RULE,
+ GW_COMMON_TABLE,
+ install);
+ }
+ }
}
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
index e2f8f92..d88c05d 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
@@ -304,6 +304,9 @@
}
String gateway = osSubnet.getGateway();
+ if (gateway == null) {
+ return;
+ }
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthType(EthType.EtherType.ARP.ethType().toShort())
diff --git a/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandlerTest.java b/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandlerTest.java
index d5227c3..b7c4e7b 100644
--- a/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandlerTest.java
+++ b/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandlerTest.java
@@ -30,6 +30,8 @@
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.LeadershipServiceAdapter;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreServiceAdapter;
import org.onosproject.core.DefaultApplicationId;
@@ -117,6 +119,10 @@
icmpHandler.instancePortService = new TestInstancePortService();
icmpHandler.osNetworkService = new TestOpenstackNetworkService();
icmpHandler.osRouterService = new TestOpenstackRouterService();
+ icmpHandler.leadershipService = new TestLeadershipService();
+ icmpHandler.osFlowRuleService = new TestOpenstackFlowRuleService();
+ icmpHandler.clusterService = new TestClusterService();
+
TestUtils.setField(icmpHandler, "eventExecutor", MoreExecutors.newDirectExecutorService());
icmpHandler.activate();
@@ -511,4 +517,22 @@
return PortNumber.portNumber(1);
}
}
+
+ /**
+ * Mocks the LeadershipService.
+ */
+ private class TestLeadershipService extends LeadershipServiceAdapter {
+ }
+
+ /**
+ * Mocks the OpenstackFlowRuleService.
+ */
+ private class TestOpenstackFlowRuleService extends OpenstackFlowRuleServiceAdapter {
+ }
+
+ /**
+ * Mocks the ClusterService.
+ */
+ private class TestClusterService extends ClusterServiceAdapter {
+ }
}
diff --git a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultOpenstackNodeHandler.java b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultOpenstackNodeHandler.java
index 9f3d220..a670705 100644
--- a/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultOpenstackNodeHandler.java
+++ b/apps/openstacknode/app/src/main/java/org/onosproject/openstacknode/impl/DefaultOpenstackNodeHandler.java
@@ -643,20 +643,23 @@
public boolean isRelevant(DeviceEvent event) {
NodeId leader = leadershipService.getLeader(appId.name());
return Objects.equals(localNode, leader) &&
- event.subject().type() == Device.Type.CONTROLLER &&
- osNodeService.node(event.subject().id()) != null &&
- osNodeService.node(event.subject().id()).type() != CONTROLLER;
+ event.subject().type() == Device.Type.CONTROLLER;
}
@Override
public void event(DeviceEvent event) {
Device device = event.subject();
- OpenstackNode osNode = osNodeService.node(device.id());
switch (event.type()) {
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_ADDED:
eventExecutor.execute(() -> {
+ OpenstackNode osNode = osNodeService.node(device.id());
+
+ if (osNode == null || osNode.type() == CONTROLLER) {
+ return;
+ }
+
if (deviceService.isAvailable(device.id())) {
log.debug("OVSDB {} detected", device.id());
bootstrapNode(osNode);
@@ -685,20 +688,23 @@
public boolean isRelevant(DeviceEvent event) {
NodeId leader = leadershipService.getLeader(appId.name());
return Objects.equals(localNode, leader) &&
- event.subject().type() == Device.Type.SWITCH &&
- osNodeService.node(event.subject().id()) != null &&
- osNodeService.node(event.subject().id()).type() != CONTROLLER;
+ event.subject().type() == Device.Type.SWITCH;
}
@Override
public void event(DeviceEvent event) {
Device device = event.subject();
- OpenstackNode osNode = osNodeService.node(device.id());
switch (event.type()) {
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_ADDED:
eventExecutor.execute(() -> {
+ OpenstackNode osNode = osNodeService.node(device.id());
+
+ if (osNode == null || osNode.type() == CONTROLLER) {
+ return;
+ }
+
if (deviceService.isAvailable(device.id())) {
log.debug("Integration bridge created on {}", osNode.hostname());
bootstrapNode(osNode);
@@ -720,6 +726,12 @@
case PORT_UPDATED:
case PORT_ADDED:
eventExecutor.execute(() -> {
+ OpenstackNode osNode = osNodeService.node(device.id());
+
+ if (osNode == null || osNode.type() == CONTROLLER) {
+ return;
+ }
+
Port port = event.port();
String portName = port.annotations().value(PORT_NAME);
if (osNode.state() == DEVICE_CREATED && (
@@ -736,6 +748,12 @@
break;
case PORT_REMOVED:
eventExecutor.execute(() -> {
+ OpenstackNode osNode = osNodeService.node(device.id());
+
+ if (osNode == null || osNode.type() == CONTROLLER) {
+ return;
+ }
+
Port port = event.port();
String portName = port.annotations().value(PORT_NAME);
if (osNode.state() == COMPLETE && (
diff --git a/drivers/gnmi/BUILD b/drivers/gnmi/BUILD
index 63b11c6..4a4c100 100644
--- a/drivers/gnmi/BUILD
+++ b/drivers/gnmi/BUILD
@@ -5,6 +5,7 @@
"@io_grpc_grpc_java//stub",
"//core/store/serializers:onos-core-serializers",
"//protocols/gnmi/stub:onos-protocols-gnmi-stub",
+ "//protocols/gnmi/api:onos-protocols-gnmi-api",
"//protocols/grpc/api:onos-protocols-grpc-api",
"//protocols/grpc/proto:onos-protocols-grpc-proto",
]
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java
new file mode 100644
index 0000000..e363f4a
--- /dev/null
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.gnmi;
+
+import org.onosproject.gnmi.api.GnmiClient;
+import org.onosproject.gnmi.api.GnmiClientKey;
+import org.onosproject.gnmi.api.GnmiController;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract implementation of a behaviour handler for a gNMI device.
+ */
+public class AbstractGnmiHandlerBehaviour extends AbstractHandlerBehaviour {
+
+ public static final String GNMI_SERVER_ADDR_KEY = "gnmi_ip";
+ public static final String GNMI_SERVER_PORT_KEY = "gnmi_port";
+ private static final String GNMI_SERVICE_NAME = "gnmi";
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+ protected DeviceId deviceId;
+ protected DeviceService deviceService;
+ protected Device device;
+ protected GnmiController controller;
+ protected GnmiClient client;
+
+ protected boolean setupBehaviour() {
+ // FIXME: Should create GnmiHandshaker which initialize the client
+ // instead of create client here.
+ deviceId = handler().data().deviceId();
+
+ controller = handler().get(GnmiController.class);
+ client = controller.getClient(deviceId);
+
+ if (client == null) {
+ client = createClient();
+ }
+
+ if (client == null) {
+ log.warn("Can not create client for {} (see log above)", deviceId);
+ return false;
+ }
+
+ return true;
+ }
+
+ protected GnmiClient createClient() {
+ deviceId = handler().data().deviceId();
+ controller = handler().get(GnmiController.class);
+
+ final String serverAddr = this.data().value(GNMI_SERVER_ADDR_KEY);
+ final String serverPortString = this.data().value(GNMI_SERVER_PORT_KEY);
+
+ if (serverAddr == null || serverPortString == null) {
+ log.warn("Unable to create client for {}, missing driver data key (required is {}, {}, and {})",
+ deviceId, GNMI_SERVER_ADDR_KEY, GNMI_SERVER_PORT_KEY);
+ return null;
+ }
+
+ final int serverPort;
+ try {
+ serverPort = Integer.parseUnsignedInt(serverPortString);
+ } catch (NumberFormatException e) {
+ log.error("{} is not a valid gNMI port number", serverPortString);
+ return null;
+ }
+ GnmiClientKey clientKey =
+ new GnmiClientKey(GNMI_SERVICE_NAME, deviceId, serverAddr, serverPort);
+ if (!controller.createClient(clientKey)) {
+ log.warn("Unable to create client for {}, aborting operation", deviceId);
+ return null;
+ }
+ return controller.getClient(deviceId);
+ }
+}
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiDeviceDescriptionDiscovery.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiDeviceDescriptionDiscovery.java
deleted file mode 100644
index e935920..0000000
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiDeviceDescriptionDiscovery.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.drivers.gnmi;
-
-import com.google.common.collect.ImmutableList;
-import gnmi.gNMIGrpc;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
-import io.grpc.netty.NettyChannelBuilder;
-import io.grpc.stub.StreamObserver;
-import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.api.GrpcController;
-import org.onosproject.net.DefaultAnnotations;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.Port;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.device.DefaultPortDescription;
-import org.onosproject.net.device.DeviceDescription;
-import org.onosproject.net.device.DeviceDescriptionDiscovery;
-import org.onosproject.net.device.PortDescription;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static gnmi.Gnmi.Path;
-import static gnmi.Gnmi.PathElem;
-import static gnmi.Gnmi.SubscribeRequest;
-import static gnmi.Gnmi.SubscribeResponse;
-import static gnmi.Gnmi.Subscription;
-import static gnmi.Gnmi.SubscriptionList;
-import static gnmi.Gnmi.Update;
-
-/**
- * Class that discovers the device description and ports of a device that
- * supports the gNMI protocol and Openconfig models.
- */
-public class GnmiDeviceDescriptionDiscovery
- extends AbstractHandlerBehaviour
- implements DeviceDescriptionDiscovery {
-
- private static final int REQUEST_TIMEOUT_SECONDS = 5;
-
- private static final Logger log = LoggerFactory
- .getLogger(GnmiDeviceDescriptionDiscovery.class);
-
- private static final String GNMI_SERVER_ADDR_KEY = "gnmi_ip";
- private static final String GNMI_SERVER_PORT_KEY = "gnmi_port";
-
- @Override
- public DeviceDescription discoverDeviceDetails() {
- return null;
- }
-
- @Override
- public List<PortDescription> discoverPortDetails() {
- log.info("Discovering port details on device {}", handler().data().deviceId());
-
- String serverAddr = this.data().value(GNMI_SERVER_ADDR_KEY);
- String serverPortString = this.data().value(GNMI_SERVER_PORT_KEY);
-
- if (serverAddr == null || serverPortString == null ||
- serverAddr.isEmpty() || serverPortString.isEmpty()) {
- log.warn("gNMI server information not provided, can't discover ports");
- return ImmutableList.of();
- }
-
- // Get the channel
- ManagedChannel channel = getChannel(serverAddr, serverPortString);
-
- if (channel == null) {
- return ImmutableList.of();
- }
-
- // Build the subscribe request
- SubscribeRequest request = subscribeRequest();
-
- // New stub
- gNMIGrpc.gNMIStub gnmiStub = gNMIGrpc.newStub(channel);
-
- final CompletableFuture<List<PortDescription>>
- reply = new CompletableFuture<>();
-
- // Subscribe to the replies
- StreamObserver<SubscribeRequest> subscribeRequest = gnmiStub
- .subscribe(new SubscribeResponseObserver(reply));
- log.debug("Interfaces request {}", request);
-
- List<PortDescription> ports;
- try {
- // Issue the request
- subscribeRequest.onNext(request);
- ports = reply.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException
- | StatusRuntimeException e) {
- log.warn("Unable to discover ports from {}: {}",
- data().deviceId(), e.getMessage());
- log.debug("{}", e);
- return ImmutableList.of();
- } finally {
- subscribeRequest.onCompleted();
- }
-
- return ports;
- }
-
- /**
- * Obtains the ManagedChannel to be used for the communication.
- *
- * @return the managed channel
- */
- private ManagedChannel getChannel(String serverAddr, String serverPortString) {
-
- DeviceId deviceId = handler().data().deviceId();
-
- GrpcController controller = handler().get(GrpcController.class);
- ManagedChannel channel = null;
-
- //FIXME can be optimized
- //getting a channel if exists.
- ManagedChannel managedChannel = controller
- .getChannels(handler().data().deviceId()).stream().filter(c -> {
- String[] authority = c.authority().split(":");
- String host = authority[0];
- String port = authority[1];
- return host.equals(serverAddr) && port.equals(serverPortString);
- }).findAny().orElse(null);
-
- if (managedChannel != null) {
- log.debug("Reusing Channel");
- channel = managedChannel;
- } else {
- log.debug("Creating Channel");
- GrpcChannelId newChannelId = GrpcChannelId.of(deviceId, "gnmi");
-
- ManagedChannelBuilder channelBuilder = NettyChannelBuilder
- .forAddress(serverAddr, Integer.valueOf(serverPortString))
- .usePlaintext(true);
-
- try {
- channel = controller.connectChannel(newChannelId, channelBuilder);
- } catch (IOException e) {
- log.warn("Unable to connect to gRPC server of {}: {}",
- deviceId, e.getMessage());
- }
- }
- return channel;
- }
-
- /**
- * Creates the subscribe request for the interfaces.
- *
- * @return subscribe request
- */
- private SubscribeRequest subscribeRequest() {
- Path path = Path.newBuilder()
- .addElem(PathElem.newBuilder().setName("interfaces").build())
- .addElem(PathElem.newBuilder().setName("interface").build())
- .addElem(PathElem.newBuilder().setName("...").build())
- .build();
- Subscription subscription = Subscription.newBuilder().setPath(path).build();
- SubscriptionList list = SubscriptionList.newBuilder().setMode(SubscriptionList.Mode.ONCE)
- .addSubscription(subscription).build();
- return SubscribeRequest.newBuilder().setSubscribe(list).build();
- }
-
- /**
- * Handles messages received from the device on the stream channel.
- */
- private final class SubscribeResponseObserver
- implements StreamObserver<SubscribeResponse> {
-
- private final CompletableFuture<List<PortDescription>> reply;
-
- private SubscribeResponseObserver(CompletableFuture<List<PortDescription>> reply) {
- this.reply = reply;
- }
-
- @Override
- public void onNext(SubscribeResponse message) {
- Map<String, DefaultPortDescription.Builder> ports = new HashMap<>();
- Map<String, DefaultAnnotations.Builder> portsAnnotations = new HashMap<>();
- log.debug("Response {} ", message.getUpdate().toString());
- message.getUpdate().getUpdateList().forEach(update -> {
- parseUpdate(ports, portsAnnotations, update);
- });
-
- List<PortDescription> portDescriptionList = new ArrayList<>();
- ports.forEach((k, v) -> {
-// v.portSpeed(1000L);
- v.type(Port.Type.COPPER);
- v.annotations(portsAnnotations.get(k).set("name", k).build());
- portDescriptionList.add(v.build());
- });
-
- reply.complete(portDescriptionList);
- }
-
-
- @Override
- public void onError(Throwable throwable) {
- log.warn("Error on stream channel for {}: {}",
- data().deviceId(), Status.fromThrowable(throwable));
- log.debug("{}", throwable);
- }
-
- @Override
- public void onCompleted() {
- log.debug("SubscribeResponseObserver completed");
- }
- }
-
- /**
- * Parses the update received from the device.
- *
- * @param ports the ports description to build
- * @param portsAnnotations the ports annotations list to populate
- * @param update the update received
- */
- private void parseUpdate(Map<String, DefaultPortDescription.Builder> ports,
- Map<String, DefaultAnnotations.Builder> portsAnnotations,
- Update update) {
-
- //FIXME crude parsing, can be done via object (de)serialization
- if (update.getPath().getElemList().size() > 3) {
- String name = update.getPath().getElem(3).getName();
- String portId = update.getPath().getElem(1).getKeyMap().get("name");
- if (!ports.containsKey(portId)) {
- int number = Character.getNumericValue(portId.charAt(portId.length() - 1));
- PortNumber portNumber = PortNumber.portNumber(number, portId);
- ports.put(portId, DefaultPortDescription.builder()
- .withPortNumber(portNumber));
- }
- if (name.equals("enabled")) {
- DefaultPortDescription.Builder builder = ports.get(portId);
- builder = builder.isEnabled(update.getVal().getBoolVal());
- ports.put(portId, builder);
- } else if (name.equals("state")) {
- String speedName = update.getPath().getElem(4).getName();
- if (speedName.equals("negotiated-port-speed")) {
- DefaultPortDescription.Builder builder = ports.get(portId);
- long speed = parsePortSpeed(update.getVal().getStringVal());
- builder = builder.portSpeed(speed);
- ports.put(portId, builder);
- }
- } else if (!name.equals("ifindex")) {
- if (!portsAnnotations.containsKey(portId)) {
- portsAnnotations.put(portId, DefaultAnnotations.builder()
- .set(name, update.getVal().toByteString()
- .toString(Charset.defaultCharset()).trim()));
- } else {
- DefaultAnnotations.Builder builder = portsAnnotations.get(portId);
- builder = builder.set(name, update.getVal().toByteString().
- toString(Charset.defaultCharset()).trim());
- portsAnnotations.put(portId, builder);
- }
- }
- }
- }
-
- private long parsePortSpeed(String speed) {
- log.debug("Speed from config {}", speed);
- switch (speed) {
- case "SPEED_10MB":
- return 10;
- case "SPEED_100MB":
- return 100;
- case "SPEED_1GB":
- return 1000;
- case "SPEED_10GB":
- return 10000;
- case "SPEED_25GB":
- return 25000;
- case "SPEED_40GB":
- return 40000;
- case "SPEED_50GB":
- return 50000;
- case "SPEED_100GB":
- return 100000;
- default:
- return 1000;
- }
- }
-}
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiDeviceDescriptionDiscovery.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiDeviceDescriptionDiscovery.java
new file mode 100644
index 0000000..f9d2236
--- /dev/null
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/OpenConfigGnmiDeviceDescriptionDiscovery.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.gnmi;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import gnmi.Gnmi;
+import gnmi.Gnmi.GetRequest;
+import gnmi.Gnmi.GetResponse;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DefaultPortDescription;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceDescriptionDiscovery;
+import org.onosproject.net.device.PortDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static gnmi.Gnmi.Path;
+import static gnmi.Gnmi.PathElem;
+import static gnmi.Gnmi.Update;
+
+/**
+ * Class that discovers the device description and ports of a device that
+ * supports the gNMI protocol and Openconfig models.
+ */
+public class OpenConfigGnmiDeviceDescriptionDiscovery
+ extends AbstractGnmiHandlerBehaviour
+ implements DeviceDescriptionDiscovery {
+
+ private static final int REQUEST_TIMEOUT_SECONDS = 5;
+
+ private static final Logger log = LoggerFactory
+ .getLogger(OpenConfigGnmiDeviceDescriptionDiscovery.class);
+
+ @Override
+ public DeviceDescription discoverDeviceDetails() {
+ return null;
+ }
+
+ @Override
+ public List<PortDescription> discoverPortDetails() {
+ if (!setupBehaviour()) {
+ return Collections.emptyList();
+ }
+ log.debug("Discovering port details on device {}", handler().data().deviceId());
+
+ GetResponse response;
+ try {
+ response = client.get(buildPortStateRequest())
+ .get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.warn("Unable to discover ports from {}: {}", deviceId, e.getMessage());
+ log.debug("{}", e);
+ return Collections.emptyList();
+ }
+
+ Map<String, DefaultPortDescription.Builder> ports = Maps.newHashMap();
+ Map<String, DefaultAnnotations.Builder> annotations = Maps.newHashMap();
+
+ // Creates port descriptions with port name and port number
+ response.getNotificationList()
+ .stream()
+ .flatMap(notification -> notification.getUpdateList().stream())
+ .forEach(update -> {
+ // /interfaces/interface[name=ifName]/state/...
+ String ifName = update.getPath().getElem(1).getKeyMap().get("name");
+ if (!ports.containsKey(ifName)) {
+ ports.put(ifName, DefaultPortDescription.builder());
+ annotations.put(ifName, DefaultAnnotations.builder());
+ }
+
+
+ DefaultPortDescription.Builder builder = ports.get(ifName);
+ DefaultAnnotations.Builder annotationsBuilder = annotations.get(ifName);
+ parseInterfaceInfo(update, ifName, builder, annotationsBuilder);
+ });
+
+ List<PortDescription> portDescriptionList = Lists.newArrayList();
+ ports.forEach((key, value) -> {
+ DefaultAnnotations annotation = annotations.get(key).build();
+ portDescriptionList.add(value.annotations(annotation).build());
+ });
+ return portDescriptionList;
+ }
+
+ private GetRequest buildPortStateRequest() {
+ Path path = Path.newBuilder()
+ .addElem(PathElem.newBuilder().setName("interfaces").build())
+ .addElem(PathElem.newBuilder().setName("interface").putKey("name", "...").build())
+ .addElem(PathElem.newBuilder().setName("state").build())
+ .build();
+ return GetRequest.newBuilder()
+ .addPath(path)
+ .setType(GetRequest.DataType.ALL)
+ .setEncoding(Gnmi.Encoding.PROTO)
+ .build();
+ }
+
+ /**
+ * Parses the interface information.
+ *
+ * @param update the update received
+ */
+ private void parseInterfaceInfo(Update update,
+ String ifName,
+ DefaultPortDescription.Builder builder,
+ DefaultAnnotations.Builder annotationsBuilder) {
+
+
+ Path path = update.getPath();
+ List<PathElem> elems = path.getElemList();
+ Gnmi.TypedValue val = update.getVal();
+ if (elems.size() == 4) {
+ // /interfaces/interface/state/ifindex
+ // /interfaces/interface/state/oper-status
+ String pathElemName = elems.get(3).getName();
+ switch (pathElemName) {
+ case "ifindex": // port number
+ builder.withPortNumber(PortNumber.portNumber(val.getUintVal(), ifName));
+ break;
+ case "oper-status":
+ builder.isEnabled(parseOperStatus(val.getStringVal()));
+ break;
+ default:
+ String valueString = val.toByteString().toString(Charset.defaultCharset()).trim();
+ if (!valueString.isEmpty()) {
+ annotationsBuilder.set(pathElemName, valueString);
+ }
+ log.debug("Unknown path: {}", path);
+ break;
+ }
+ }
+ if (elems.size() == 5) {
+ // /interfaces/interface/ethernet/config/port-speed
+ String pathElemName = elems.get(4).getName();
+ switch (pathElemName) {
+ case "port-speed":
+ builder.portSpeed(parsePortSpeed(val.getStringVal()));
+ break;
+ default:
+ String valueString = val.toByteString().toString(Charset.defaultCharset()).trim();
+ if (!valueString.isEmpty()) {
+ annotationsBuilder.set(pathElemName, valueString);
+ }
+ log.debug("Unknown path: {}", path);
+ break;
+ }
+ }
+ }
+
+ private boolean parseOperStatus(String operStatus) {
+ switch (operStatus) {
+ case "UP":
+ return true;
+ case "DOWN":
+ default:
+ return false;
+ }
+ }
+
+ private long parsePortSpeed(String speed) {
+ log.debug("Speed from config {}", speed);
+ switch (speed) {
+ case "SPEED_10MB":
+ return 10;
+ case "SPEED_100MB":
+ return 100;
+ case "SPEED_1GB":
+ return 1000;
+ case "SPEED_10GB":
+ return 10000;
+ case "SPEED_25GB":
+ return 25000;
+ case "SPEED_40GB":
+ return 40000;
+ case "SPEED_50GB":
+ return 50000;
+ case "SPEED_100GB":
+ return 100000;
+ default:
+ return 1000;
+ }
+ }
+}
diff --git a/drivers/gnmi/src/main/resources/gnmi-drivers.xml b/drivers/gnmi/src/main/resources/gnmi-drivers.xml
index 3744781..4ca539e 100644
--- a/drivers/gnmi/src/main/resources/gnmi-drivers.xml
+++ b/drivers/gnmi/src/main/resources/gnmi-drivers.xml
@@ -17,7 +17,7 @@
<drivers>
<driver name="gnmi" manufacturer="gnmi" hwVersion="master" swVersion="master">
<behaviour api="org.onosproject.net.device.DeviceDescriptionDiscovery"
- impl="org.onosproject.drivers.gnmi.GnmiDeviceDescriptionDiscovery"/>
+ impl="org.onosproject.drivers.gnmi.OpenConfigGnmiDeviceDescriptionDiscovery"/>
</driver>
</drivers>
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
index 3f6c9ad..90e7a31 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
@@ -27,6 +27,7 @@
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.net.pi.service.PiTranslationService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -157,7 +158,9 @@
return null;
}
- if (!controller.createClient(deviceId, serverAddr, serverPort, p4DeviceId)) {
+ P4RuntimeClientKey clientKey = new
+ P4RuntimeClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
+ if (!controller.createClient(clientKey)) {
log.warn("Unable to create client for {}, aborting operation", deviceId);
return null;
}
diff --git a/modules.defs b/modules.defs
index b4097c7..5c16ff3 100644
--- a/modules.defs
+++ b/modules.defs
@@ -231,7 +231,7 @@
'//apps/l3vpn:onos-apps-l3vpn-oar',
'//apps/openroadm:onos-apps-openroadm-oar',
'//apps/artemis:onos-apps-artemis-oar',
- '//apps/pi-demo/ecmp:onos-apps-pi-demo-ecmp-oar',
+ #'//apps/pi-demo/ecmp:onos-apps-pi-demo-ecmp-oar',
'//apps/gluon:onos-apps-gluon-oar',
'//apps/evpnopenflow:onos-apps-evpnopenflow-oar',
'//apps/route-service:onos-apps-route-service-oar',
@@ -256,7 +256,7 @@
'//apps/mcast:onos-apps-mcast-oar',
'//apps/layout:onos-apps-layout-oar',
'//apps/imr:onos-apps-imr-oar',
- '//apps/inbandtelemetry/app:onos-apps-inbandtelemetry-app-oar',
+ #'//apps/inbandtelemetry/app:onos-apps-inbandtelemetry-app-oar',
'//apps/workflow:onos-apps-workflow-oar',
# nodemetrics application
'//apps/nodemetrics:onos-apps-nodemetrics-oar',
diff --git a/protocols/gnmi/BUILD b/protocols/gnmi/BUILD
index 8a3510d..0b6b6e6 100644
--- a/protocols/gnmi/BUILD
+++ b/protocols/gnmi/BUILD
@@ -1,11 +1,13 @@
BUNDLES = [
"//protocols/gnmi/stub:onos-protocols-gnmi-stub",
+ "//protocols/gnmi/ctl:onos-protocols-gnmi-ctl",
+ "//protocols/gnmi/api:onos-protocols-gnmi-api",
]
onos_app(
app_name = "org.onosproject.protocols.gnmi",
category = "Protocol",
- description = "ONOS gNMI protocol subsystem",
+ description = "gNMI protocol subsystem",
included_bundles = BUNDLES,
required_apps = [
"org.onosproject.protocols.grpc",
diff --git a/protocols/gnmi/api/BUILD b/protocols/gnmi/api/BUILD
new file mode 100644
index 0000000..1cf1a3d
--- /dev/null
+++ b/protocols/gnmi/api/BUILD
@@ -0,0 +1,8 @@
+COMPILE_DEPS = CORE_DEPS + [
+ "//protocols/grpc/api:onos-protocols-grpc-api",
+ "//protocols/gnmi/stub:onos-protocols-gnmi-stub",
+]
+
+osgi_jar(
+ deps = COMPILE_DEPS,
+)
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
new file mode 100644
index 0000000..7f43d1f
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import com.google.common.annotations.Beta;
+
+import gnmi.Gnmi.CapabilityResponse;
+import gnmi.Gnmi.GetResponse;
+import gnmi.Gnmi.GetRequest;
+import gnmi.Gnmi.SetRequest;
+import gnmi.Gnmi.SetResponse;
+import org.onosproject.grpc.api.GrpcClient;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Client to control a gNMI server.
+ */
+@Beta
+public interface GnmiClient extends GrpcClient {
+
+ /**
+ * Gets capability from a target. This allows device driver behavior
+ * to validate the service version and models which gNMI device supported.
+ *
+ * @return the capability response
+ */
+ CompletableFuture<CapabilityResponse> capability();
+
+ /**
+ * Retrieve a snapshot of data from the device.
+ *
+ * @param request the get request
+ * @return the snapshot of data from the device
+ */
+ CompletableFuture<GetResponse> get(GetRequest request);
+
+ /**
+ * Modifies the state of data on the device.
+ *
+ * @param request the set request
+ * @return the set result
+ */
+ CompletableFuture<SetResponse> set(SetRequest request);
+
+ // TODO: Support gNMI subscription
+}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java
new file mode 100644
index 0000000..26ce113
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.grpc.api.GrpcClientKey;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Key that uniquely identifies a gNMI client.
+ */
+@Beta
+public class GnmiClientKey extends GrpcClientKey {
+
+ /**
+ * Creates a new gNMI client key.
+ *
+ * @param serviceName gNMI service name of the client
+ * @param deviceId ONOS device ID
+ * @param serverAddr gNMI server address
+ * @param serverPort gNMI server port
+ */
+ public GnmiClientKey(String serviceName, DeviceId deviceId, String serverAddr, int serverPort) {
+ super(serviceName, deviceId, serverAddr, serverPort);
+ }
+}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
new file mode 100644
index 0000000..f4964ed
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.grpc.api.GrpcClientController;
+
+/**
+ * Controller of gNMI devices.
+ */
+@Beta
+public interface GnmiController
+ extends GrpcClientController<GnmiClientKey, GnmiClient> {
+}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java
new file mode 100644
index 0000000..5129926
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Representation of an event received from a gNMI device.
+ */
+@Beta
+public final class GnmiEvent extends AbstractEvent<GnmiEvent.Type, GnmiEventSubject> {
+
+ /**
+ * Type of gNMI event.
+ */
+ public enum Type {
+ /**
+ * Update.
+ */
+ UPDATE,
+
+ /**
+ * Sync response.
+ */
+ SYNC_RESPONSE
+ }
+
+ protected GnmiEvent(Type type, GnmiEventSubject subject) {
+ super(type, subject);
+ }
+}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEventListener.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEventListener.java
new file mode 100644
index 0000000..0fe2fd3
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.EventListener;
+
+/**
+ * A listener of events received from gNMI devices.
+ */
+@Beta
+public interface GnmiEventListener extends EventListener<GnmiEvent> {
+}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEventSubject.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEventSubject.java
new file mode 100644
index 0000000..d1563ed
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEventSubject.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Information about an event generated by a gNMI device .
+ */
+@Beta
+public interface GnmiEventSubject {
+
+ /**
+ * Returns the deviceId associated to this subject.
+ *
+ * @return the device ID
+ */
+ DeviceId deviceId();
+}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/package-info.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/package-info.java
new file mode 100644
index 0000000..5f07b46
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gNMI protocol API.
+ */
+package org.onosproject.gnmi.api;
\ No newline at end of file
diff --git a/protocols/gnmi/ctl/BUILD b/protocols/gnmi/ctl/BUILD
new file mode 100644
index 0000000..db4cd44
--- /dev/null
+++ b/protocols/gnmi/ctl/BUILD
@@ -0,0 +1,23 @@
+COMPILE_DEPS = CORE_DEPS + KRYO + [
+ "//protocols/gnmi/api:onos-protocols-gnmi-api",
+ "//protocols/gnmi/stub:onos-protocols-gnmi-stub",
+ "//protocols/grpc/api:onos-protocols-grpc-api",
+ "//protocols/grpc/ctl:onos-protocols-grpc-ctl",
+ "//protocols/grpc:grpc-core-repkg",
+ "@com_google_protobuf//:protobuf_java",
+ "@io_grpc_grpc_java//netty",
+ "@io_grpc_grpc_java//protobuf-lite",
+ "@io_grpc_grpc_java//stub",
+ "@com_google_api_grpc_proto_google_common_protos//jar",
+]
+
+TEST_DEPS = TEST + [
+ "@minimal_json//jar",
+ "@io_grpc_grpc_java//core:inprocess",
+ "@io_grpc_grpc_java//protobuf-lite",
+]
+
+osgi_jar_with_tests(
+ test_deps = TEST_DEPS,
+ deps = COMPILE_DEPS,
+)
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
new file mode 100644
index 0000000..74a9ec0
--- /dev/null
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package protocols.gnmi.ctl.java.org.onosproject.gnmi.ctl;
+
+import gnmi.Gnmi.CapabilityRequest;
+import gnmi.Gnmi.CapabilityResponse;
+import gnmi.Gnmi.GetRequest;
+import gnmi.Gnmi.GetResponse;
+import gnmi.Gnmi.SetRequest;
+import gnmi.Gnmi.SetResponse;
+import gnmi.gNMIGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
+import org.onosproject.gnmi.api.GnmiClientKey;
+import org.onosproject.grpc.ctl.AbstractGrpcClient;
+import org.slf4j.Logger;
+import org.onosproject.gnmi.api.GnmiClient;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of gNMI client.
+ */
+public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
+ private final Logger log = getLogger(getClass());
+ private final gNMIGrpc.gNMIBlockingStub blockingStub;
+
+ public GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel) {
+ super(clientKey, managedChannel);
+ this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
+ }
+
+ @Override
+ public CompletableFuture<CapabilityResponse> capability() {
+ return supplyInContext(this::doCapability, "capability");
+ }
+
+ @Override
+ public CompletableFuture<GetResponse> get(GetRequest request) {
+ return supplyInContext(() -> doGet(request), "get");
+ }
+
+ @Override
+ public CompletableFuture<SetResponse> set(SetRequest request) {
+ return supplyInContext(() -> doSet(request), "set");
+ }
+
+ private CapabilityResponse doCapability() {
+ CapabilityRequest request = CapabilityRequest.newBuilder().build();
+ try {
+ return blockingStub.capabilities(request);
+ } catch (StatusRuntimeException e) {
+ log.warn("Unable to get capability from {}: {}", deviceId, e.getMessage());
+ return null;
+ }
+ }
+
+ private GetResponse doGet(GetRequest request) {
+ try {
+ return blockingStub.get(request);
+ } catch (StatusRuntimeException e) {
+ log.warn("Unable to get data from {}: {}", deviceId, e.getMessage());
+ return null;
+ }
+ }
+
+ private SetResponse doSet(SetRequest request) {
+ try {
+ return blockingStub.set(request);
+ } catch (StatusRuntimeException e) {
+ log.warn("Unable to set data to {}: {}", deviceId, e.getMessage());
+ return null;
+ }
+ }
+}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
new file mode 100644
index 0000000..8392c4a
--- /dev/null
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package protocols.gnmi.ctl.java.org.onosproject.gnmi.ctl;
+
+import io.grpc.ManagedChannel;
+import org.onosproject.gnmi.api.GnmiClient;
+import org.onosproject.gnmi.api.GnmiClientKey;
+import org.onosproject.gnmi.api.GnmiController;
+import org.onosproject.gnmi.api.GnmiEvent;
+import org.onosproject.gnmi.api.GnmiEventListener;
+import org.onosproject.grpc.ctl.AbstractGrpcClientController;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of gNMI controller.
+ */
+@Component(immediate = true, service = GnmiController.class)
+public class GnmiControllerImpl
+ extends AbstractGrpcClientController<GnmiClientKey, GnmiClient, GnmiEvent, GnmiEventListener>
+ implements GnmiController {
+ private final Logger log = getLogger(getClass());
+
+ @Activate
+ public void activate() {
+ super.activate();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ super.deactivate();
+ log.info("Stopped");
+ }
+
+ @Override
+ protected GnmiClient createClientInstance(GnmiClientKey clientKey, ManagedChannel channel) {
+ return new GnmiClientImpl(clientKey, channel);
+ }
+}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/package-info.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/package-info.java
new file mode 100644
index 0000000..ae46aa4
--- /dev/null
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation classes of the gNMI protocol subsystem.
+ */
+package protocols.gnmi.ctl.java.org.onosproject.gnmi.ctl;
\ No newline at end of file
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
similarity index 98%
rename from protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
rename to protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
index 33df22b..5b6771c 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
@@ -30,7 +30,7 @@
* Abstraction of a gRPC controller that stores and manages gRPC channels.
*/
@Beta
-public interface GrpcController {
+public interface GrpcChannelController {
int CONNECTION_TIMEOUT_SECONDS = 20;
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
new file mode 100644
index 0000000..d040a23
--- /dev/null
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.api;
+
+import com.google.common.annotations.Beta;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction of a gRPC client.
+ *
+ */
+@Beta
+public interface GrpcClient {
+
+ /**
+ * Shutdowns the client by terminating any active RPC.
+ *
+ * @return a completable future to signal the completion of the shutdown
+ * procedure
+ */
+ CompletableFuture<Void> shutdown();
+}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
new file mode 100644
index 0000000..087898b
--- /dev/null
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.api;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Abstraction of a gRPC controller which controls specific gRPC
+ * client {@link C} with specific client key {@link K}.
+ *
+ * @param <K> the gRPC client key
+ * @param <C> the gRPC client type
+ */
+@Beta
+public interface GrpcClientController<K extends GrpcClientKey, C extends GrpcClient> {
+
+ /**
+ * Instantiates a new client to operate on a gRPC server identified by
+ * the given information. As a result of this method, a client can be later
+ * obtained by invoking {@link #getClient(DeviceId)}.
+ *
+ * Only one client can exist for the same device ID. Calls to this method are
+ * idempotent fot the same client key, i.e. returns true
+ * if such client already exists but a new one is not created.
+ * If there exists a client with same device ID but different address and port,
+ * removes old one and recreate new one.
+ *
+ * @param clientKey the client key
+ * @return true if the client was created and the channel to the server is open;
+ * false otherwise
+ */
+ boolean createClient(K clientKey);
+
+ /**
+ * Retrieves the gRPC client to operate on the given device.
+ *
+ * @param deviceId the device identifier
+ * @return the gRPC client of the device if exists; null otherwise
+ */
+ C getClient(DeviceId deviceId);
+
+ /**
+ * Removes the gRPC client for the given device. If no client
+ * exists for the given device, the result is a no-op.
+ *
+ * @param deviceId the device identifier
+ */
+ void removeClient(DeviceId deviceId);
+
+ /**
+ * Check reachability of the gRPC server running on the given device.
+ * Reachability can be tested only if a client is previously created
+ * using {@link #createClient(GrpcClientKey)}.
+ * Different gRPC service may have different ways to test if it is
+ * reachable or not.
+ *
+ * @param deviceId the device identifier
+ * @return true of client was created and is able to contact the gNMI server;
+ * false otherwise
+ */
+ boolean isReachable(DeviceId deviceId);
+}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
new file mode 100644
index 0000000..ad8a7da
--- /dev/null
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.api;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import org.onosproject.net.DeviceId;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Key that uniquely identifies a gRPC client.
+ */
+@Beta
+public class GrpcClientKey {
+ private final String serviceName;
+ private final DeviceId deviceId;
+ private final String serverAddr;
+ private final int serverPort;
+
+ /**
+ * Creates a new client key.
+ *
+ * @param serviceName gRPC service name of the client
+ * @param deviceId ONOS device ID
+ * @param serverAddr gRPC server address
+ * @param serverPort gRPC server port
+ */
+ public GrpcClientKey(String serviceName, DeviceId deviceId, String serverAddr, int serverPort) {
+ checkNotNull(serviceName);
+ checkNotNull(deviceId);
+ checkNotNull(serverAddr);
+ checkArgument(!serviceName.isEmpty(),
+ "Service name can not be null");
+ checkArgument(!serverAddr.isEmpty(),
+ "Server address should not be empty");
+ checkArgument(serverPort > 0 && serverPort <= 65535, "Invalid server port");
+ this.serviceName = serviceName;
+ this.deviceId = deviceId;
+ this.serverAddr = serverAddr;
+ this.serverPort = serverPort;
+ }
+
+ /**
+ * Gets the gRPC service name of the client.
+ *
+ * @return the service name
+ */
+ public String serviceName() {
+ return serviceName;
+ }
+
+ /**
+ * Gets the device ID.
+ *
+ * @return the device ID
+ */
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ /**
+ * Gets the gRPC server address.
+ *
+ * @return the gRPC server address.
+ */
+ public String serverAddr() {
+ return serverAddr;
+ }
+
+ /**
+ * Gets the gRPC server port.
+ *
+ * @return the gRPC server port.
+ */
+ public int serverPort() {
+ return serverPort;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GrpcClientKey that = (GrpcClientKey) o;
+ return serverPort == that.serverPort &&
+ Objects.equal(serviceName, that.serviceName) &&
+ Objects.equal(deviceId, that.deviceId) &&
+ Objects.equal(serverAddr, that.serverAddr);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(serviceName, deviceId, serverAddr, serverPort);
+ }
+
+ protected MoreObjects.ToStringHelper toStringHelper() {
+ return MoreObjects.toStringHelper(this)
+ .add("serviceName", serviceName)
+ .add("deviceId", deviceId)
+ .add("serverAddr", serverAddr)
+ .add("serverPort", serverPort);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper().toString();
+ }
+}
\ No newline at end of file
diff --git a/protocols/grpc/ctl/BUILD b/protocols/grpc/ctl/BUILD
index 20c7ab8..ac0703d 100644
--- a/protocols/grpc/ctl/BUILD
+++ b/protocols/grpc/ctl/BUILD
@@ -2,6 +2,7 @@
"//protocols/grpc/api:onos-protocols-grpc-api",
"//protocols/grpc/proto:onos-protocols-grpc-proto",
"@io_grpc_grpc_java//core",
+ "@io_grpc_grpc_java//netty",
]
osgi_jar(
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
new file mode 100644
index 0000000..4764a56
--- /dev/null
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.ctl;
+
+import io.grpc.Context;
+import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
+import org.onlab.util.SharedExecutors;
+import org.onosproject.grpc.api.GrpcClient;
+import org.onosproject.grpc.api.GrpcClientKey;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstract client for gRPC service.
+ *
+ */
+public abstract class AbstractGrpcClient implements GrpcClient {
+
+ // Timeout in seconds to obtain the request lock.
+ protected static final int LOCK_TIMEOUT = 60;
+ private static final int DEFAULT_THREAD_POOL_SIZE = 10;
+
+ protected final Logger log = getLogger(getClass());
+
+ protected final Lock requestLock = new ReentrantLock();
+ protected final Context.CancellableContext cancellableContext =
+ Context.current().withCancellation();
+ protected final ExecutorService executorService;
+ protected final Executor contextExecutor;
+
+ protected ManagedChannel channel;
+ protected DeviceId deviceId;
+
+ protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel) {
+ this.deviceId = clientKey.deviceId();
+ this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
+ "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
+ this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+ this.channel = channel;
+ }
+
+ @Override
+ public CompletableFuture<Void> shutdown() {
+ return supplyWithExecutor(this::doShutdown, "shutdown",
+ SharedExecutors.getPoolThreadExecutor());
+ }
+
+ protected Void doShutdown() {
+ log.debug("Shutting down client for {}...", deviceId);
+ cancellableContext.cancel(new InterruptedException(
+ "Requested client shutdown"));
+ this.executorService.shutdownNow();
+ try {
+ executorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Executor service didn't shutdown in time.");
+ Thread.currentThread().interrupt();
+ }
+ return null;
+ }
+
+ /**
+ * Equivalent of supplyWithExecutor using the gRPC context executor of this
+ * client, such that if the context is cancelled (e.g. client shutdown) the
+ * RPC is automatically cancelled.
+ *
+ * @param <U> return type of supplier
+ * @param supplier the supplier to be executed
+ * @param opDescription the description of this supplier
+ * @return CompletableFuture includes the result of supplier
+ */
+ protected <U> CompletableFuture<U> supplyInContext(
+ Supplier<U> supplier, String opDescription) {
+ return supplyWithExecutor(supplier, opDescription, contextExecutor);
+ }
+
+ /**
+ * Submits a task for async execution via the given executor. All tasks
+ * submitted with this method will be executed sequentially.
+ *
+ * @param <U> return type of supplier
+ * @param supplier the supplier to be executed
+ * @param opDescription the description of this supplier
+ * @param executor the executor to execute this supplier
+ * @return CompletableFuture includes the result of supplier
+ */
+ protected <U> CompletableFuture<U> supplyWithExecutor(
+ Supplier<U> supplier, String opDescription, Executor executor) {
+ return CompletableFuture.supplyAsync(() -> {
+ // TODO: explore a more relaxed locking strategy.
+ try {
+ if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
+ log.error("LOCK TIMEOUT! This is likely a deadlock, "
+ + "please debug (executing {})",
+ opDescription);
+ throw new IllegalThreadStateException("Lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.warn("Thread interrupted while waiting for lock (executing {})",
+ opDescription);
+ throw new IllegalStateException(e);
+ }
+ try {
+ return supplier.get();
+ } catch (StatusRuntimeException ex) {
+ log.warn("Unable to execute {} on {}: {}",
+ opDescription, deviceId, ex.toString());
+ throw ex;
+ } catch (Throwable ex) {
+ log.error("Exception in client of {}, executing {}",
+ deviceId, opDescription, ex);
+ throw ex;
+ } finally {
+ requestLock.unlock();
+ }
+ }, executor);
+ }
+}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
new file mode 100644
index 0000000..36e453c
--- /dev/null
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.ctl;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.event.Event;
+import org.onosproject.event.EventListener;
+import org.onosproject.grpc.api.GrpcChannelController;
+import org.onosproject.grpc.api.GrpcChannelId;
+import org.onosproject.grpc.api.GrpcClient;
+import org.onosproject.grpc.api.GrpcClientController;
+import org.onosproject.grpc.api.GrpcClientKey;
+import org.onosproject.net.DeviceId;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstract class of a gRPC based client controller for specific gRPC client
+ * which provides basic gRPC client management and thread safe mechanism.
+ *
+ * @param <C> the gRPC client type
+ * @param <K> the key type of the gRPC client
+ * @param <E> the event type of the gRPC client
+ * @param <L> the event listener of event {@link E}
+ */
+@Component
+public abstract class AbstractGrpcClientController
+ <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
+ extends AbstractListenerManager<E, L>
+ implements GrpcClientController<K, C> {
+
+ /**
+ * The default max inbound message size (MB).
+ */
+ private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
+ private static final int MEGABYTES = 1024 * 1024;
+ private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
+
+ private final Logger log = getLogger(getClass());
+ private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
+ private final Map<K, C> clients = Maps.newHashMap();
+ private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+ private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private GrpcChannelController grpcChannelController;
+
+ @Activate
+ public void activate() {
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clientKeys.keySet().forEach(this::removeClient);
+ clientKeys.clear();
+ clients.clear();
+ channelIds.clear();
+ log.info("Stopped");
+ }
+
+ @Override
+ public boolean createClient(K clientKey) {
+ checkNotNull(clientKey);
+ return withDeviceLock(() -> doCreateClient(clientKey), clientKey.deviceId());
+ }
+
+ private boolean doCreateClient(K clientKey) {
+ DeviceId deviceId = clientKey.deviceId();
+ String serverAddr = clientKey.serverAddr();
+ int serverPort = clientKey.serverPort();
+
+ if (clientKeys.containsKey(deviceId)) {
+ final GrpcClientKey existingKey = clientKeys.get(deviceId);
+ if (clientKey.equals(existingKey)) {
+ log.debug("Not creating client for {} as it already exists (key={})...",
+ deviceId, clientKey);
+ return true;
+ } else {
+ log.info("Requested client for {} with new " +
+ "endpoint, removing old client (key={})...",
+ deviceId, clientKey);
+ doRemoveClient(deviceId);
+ }
+ }
+ log.info("Creating client for {} (server={}:{})...",
+ deviceId, serverAddr, serverPort);
+ GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(), clientKey.toString());
+ ManagedChannelBuilder channelBuilder = NettyChannelBuilder
+ .forAddress(serverAddr, serverPort)
+ .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES)
+ .usePlaintext();
+
+ ManagedChannel channel;
+ try {
+ channel = grpcChannelController.connectChannel(channelId, channelBuilder);
+ } catch (IOException e) {
+ log.warn("Unable to connect to gRPC server of {}: {}",
+ clientKey.deviceId(), e.getMessage());
+ return false;
+ }
+
+ C client = createClientInstance(clientKey, channel);
+ if (client == null) {
+ log.warn("Cannot create client for {} (key={})", deviceId, clientKey);
+ return false;
+ }
+ clientKeys.put(deviceId, clientKey);
+ clients.put(clientKey, client);
+ channelIds.put(deviceId, channelId);
+
+ return true;
+ }
+
+ protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
+
+ @Override
+ public C getClient(DeviceId deviceId) {
+ checkNotNull(deviceId);
+ return withDeviceLock(() -> doGetClient(deviceId), deviceId);
+ }
+
+ protected C doGetClient(DeviceId deviceId) {
+ if (!clientKeys.containsKey(deviceId)) {
+ return null;
+ }
+ return clients.get(clientKeys.get(deviceId));
+ }
+
+ @Override
+ public void removeClient(DeviceId deviceId) {
+ checkNotNull(deviceId);
+ withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
+ }
+
+ private Void doRemoveClient(DeviceId deviceId) {
+ if (clientKeys.containsKey(deviceId)) {
+ final K clientKey = clientKeys.get(deviceId);
+ clients.get(clientKey).shutdown();
+ grpcChannelController.disconnectChannel(channelIds.get(deviceId));
+ clientKeys.remove(deviceId);
+ clients.remove(clientKey);
+ channelIds.remove(deviceId);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isReachable(DeviceId deviceId) {
+ checkNotNull(deviceId);
+ return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
+ }
+
+ protected boolean doIsReachable(DeviceId deviceId) {
+ // Default behaviour checks only the gRPC channel, should
+ // check according to different gRPC service
+ if (!clientKeys.containsKey(deviceId)) {
+ log.debug("No client for {}, can't check for reachability", deviceId);
+ return false;
+ }
+ return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
+ }
+
+ private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
+ final Lock lock = stripedLocks.get(deviceId);
+ lock.lock();
+ try {
+ return task.get();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
similarity index 96%
rename from protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
rename to protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index 7d038ce..1726203 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -33,8 +33,8 @@
import io.grpc.StatusRuntimeException;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.grpc.api.GrpcChannelController;
import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.api.GrpcController;
import org.onosproject.grpc.proto.dummy.Dummy;
import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
import org.onosproject.net.DeviceId;
@@ -64,14 +64,15 @@
import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_DEFAULT;
/**
- * Default implementation of the GrpcController.
+ * Default implementation of the GrpcChannelController.
*/
-@Component(immediate = true, service = GrpcController.class,
+@Component(immediate = true, service = GrpcChannelController.class,
property = {
ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT,
})
-public class GrpcControllerImpl implements GrpcController {
+public class GrpcChannelControllerImpl implements GrpcChannelController {
+ // FIXME: Should use message size to determine whether it needs to log the message or not.
private static final String SET_FORWARDING_PIPELINE_CONFIG_METHOD = "p4.P4Runtime/SetForwardingPipelineConfig";
@Reference(cardinality = ReferenceCardinality.MANDATORY)
diff --git a/protocols/p4runtime/api/BUILD b/protocols/p4runtime/api/BUILD
index 531d734..2fb15ba 100644
--- a/protocols/p4runtime/api/BUILD
+++ b/protocols/p4runtime/api/BUILD
@@ -1,4 +1,5 @@
COMPILE_DEPS = CORE_DEPS + [
+ "//protocols/grpc/api:onos-protocols-grpc-api",
"@io_grpc_grpc_java//core",
]
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
index f44c629..7a08668 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
@@ -17,6 +17,7 @@
package org.onosproject.p4runtime.api;
import com.google.common.annotations.Beta;
+import org.onosproject.grpc.api.GrpcClient;
import org.onosproject.net.pi.model.PiActionProfileId;
import org.onosproject.net.pi.model.PiCounterId;
import org.onosproject.net.pi.model.PiMeterId;
@@ -42,7 +43,7 @@
* Client to control a P4Runtime device.
*/
@Beta
-public interface P4RuntimeClient {
+public interface P4RuntimeClient extends GrpcClient {
/**
* Type of write operation.
@@ -70,15 +71,6 @@
boolean isStreamChannelOpen();
/**
- * Shutdowns the client by terminating any active RPC such as the Stream
- * one.
- *
- * @return a completable future to signal the completion of the shutdown
- * procedure
- */
- CompletableFuture<Void> shutdown();
-
- /**
* Sends a master arbitration update to the device with a new election ID
* that is guaranteed to be the highest value between all clients.
*
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
new file mode 100644
index 0000000..70bf33c
--- /dev/null
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.api;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.grpc.api.GrpcClientKey;
+import org.onosproject.net.DeviceId;
+
+import java.util.Objects;
+
+/**
+ * Key that uniquely identifies a P4Runtime client.
+ */
+@Beta
+public final class P4RuntimeClientKey extends GrpcClientKey {
+ private static final String P4R_SERVICE_NAME = "p4runtime";
+ private final long p4DeviceId;
+
+ /**
+ * Creates a new client key.
+ *
+ * @param deviceId ONOS device ID
+ * @param serverAddr P4Runtime server address
+ * @param serverPort P4Runtime server port
+ * @param p4DeviceId P4Runtime server-internal device ID
+ */
+ public P4RuntimeClientKey(DeviceId deviceId, String serverAddr,
+ int serverPort, long p4DeviceId) {
+ super(P4R_SERVICE_NAME, deviceId, serverAddr, serverPort);
+ this.p4DeviceId = p4DeviceId;
+ }
+
+ /**
+ * Returns the P4Runtime server-internal device ID.
+ *
+ * @return P4Runtime server-internal device ID
+ */
+ public long p4DeviceId() {
+ return p4DeviceId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ P4RuntimeClientKey that = (P4RuntimeClientKey) o;
+ return p4DeviceId == that.p4DeviceId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), p4DeviceId);
+ }
+
+ @Override
+ public String toString() {
+ return super.toStringHelper()
+ .add("p4DeviceId", p4DeviceId)
+ .toString();
+ }
+}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 159e313..0bc8ed6 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -18,6 +18,7 @@
import com.google.common.annotations.Beta;
import org.onosproject.event.ListenerService;
+import org.onosproject.grpc.api.GrpcClientController;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.provider.ProviderId;
@@ -27,73 +28,8 @@
*/
@Beta
public interface P4RuntimeController
- extends ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
-
- /**
- * Instantiates a new client to operate on a P4Runtime device identified by
- * the given information. As a result of this method, a {@link
- * P4RuntimeClient} can be later obtained by invoking {@link
- * #getClient(DeviceId)}. Returns true if the client was created and the
- * channel to the device is open, false otherwise.
- * <p>
- * Only one client can exist for the same device ID. Calls to this method
- * are idempotent for the same [device ID, address, port, p4DeviceId]
- * triplet, i.e. returns true if such client already exists but a new one is
- * not created. Throws an {@link IllegalStateException} if a client for
- * device ID already exists but for different [address, port, p4DeviceId].
- *
- * @param deviceId device identifier
- * @param serverAddr address of the P4Runtime server
- * @param serverPort port of the P4Runtime server
- * @param p4DeviceId P4Runtime-specific device identifier
- * @return true if the client was created and the channel to the device is
- * open
- * @throws IllegalStateException if a client already exists for this device
- * ID but for different [address, port,
- * p4DeviceId] triplet.
- */
- boolean createClient(DeviceId deviceId, String serverAddr, int serverPort,
- long p4DeviceId);
-
- /**
- * Returns a client to operate on the given device, or null if a client for
- * such device does not exist in this controller.
- *
- * @param deviceId device identifier
- * @return client instance or null
- */
- P4RuntimeClient getClient(DeviceId deviceId);
-
- /**
- * Removes the client for the given device. If no client exists for the
- * given device identifier, the result is a no-op.
- *
- * @param deviceId device identifier
- */
- void removeClient(DeviceId deviceId);
-
- /**
- * Returns true if a client exists for the given device identifier, false
- * otherwise.
- *
- * @param deviceId device identifier
- * @return true if client exists, false otherwise.
- */
- boolean hasClient(DeviceId deviceId);
-
- /**
- * Returns true if the P4Runtime server running on the given device is
- * reachable, i.e. the channel is open and the server is able to respond to
- * RPCs, false otherwise. Reachability can be tested only if a client was
- * previously created using {@link #createClient(DeviceId, String, int,
- * long)}, otherwise this method returns false.
- *
- * @param deviceId device identifier.
- * @return true if a client was created and is able to contact the P4Runtime
- * server, false otherwise.
- */
- boolean isReachable(DeviceId deviceId);
-
+ extends GrpcClientController<P4RuntimeClientKey, P4RuntimeClient>,
+ ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
/**
* Adds a listener for device agent events for the given provider.
*
diff --git a/protocols/p4runtime/ctl/BUILD b/protocols/p4runtime/ctl/BUILD
index d6c0c7a..66aab43 100644
--- a/protocols/p4runtime/ctl/BUILD
+++ b/protocols/p4runtime/ctl/BUILD
@@ -1,6 +1,7 @@
COMPILE_DEPS = CORE_DEPS + KRYO + [
"//core/store/serializers:onos-core-serializers",
"//protocols/grpc/api:onos-protocols-grpc-api",
+ "//protocols/grpc/ctl:onos-protocols-grpc-ctl",
"//protocols/p4runtime/api:onos-protocols-p4runtime-api",
"//protocols/p4runtime/proto:onos-protocols-p4runtime-proto",
"@com_google_protobuf//:protobuf_java",
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java
deleted file mode 100644
index cc4bed0..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import com.google.common.base.MoreObjects;
-import org.onosproject.net.DeviceId;
-
-import java.util.Objects;
-
-/**
- * Key that uniquely identifies a P4Runtime client.
- */
-final class ClientKey {
-
- private final DeviceId deviceId;
- private final String serverAddr;
- private final int serverPort;
- private final long p4DeviceId;
-
- /**
- * Creates a new client key.
- *
- * @param deviceId ONOS device ID
- * @param serverAddr P4Runtime server address
- * @param serverPort P4Runtime server port
- * @param p4DeviceId P4Runtime server-internal device ID
- */
- ClientKey(DeviceId deviceId, String serverAddr, int serverPort, long p4DeviceId) {
- this.deviceId = deviceId;
- this.serverAddr = serverAddr;
- this.serverPort = serverPort;
- this.p4DeviceId = p4DeviceId;
- }
-
- /**
- * Returns the device ID.
- *
- * @return device ID.
- */
- public DeviceId deviceId() {
- return deviceId;
- }
-
- /**
- * Returns the P4Runtime server address.
- *
- * @return P4Runtime server address
- */
- public String serverAddr() {
- return serverAddr;
- }
-
- /**
- * Returns the P4Runtime server port.
- *
- * @return P4Runtime server port
- */
- public int serverPort() {
- return serverPort;
- }
-
- /**
- * Returns the P4Runtime server-internal device ID.
- *
- * @return P4Runtime server-internal device ID
- */
- public long p4DeviceId() {
- return p4DeviceId;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(serverAddr, serverPort, p4DeviceId);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- final ClientKey other = (ClientKey) obj;
- return Objects.equals(this.serverAddr, other.serverAddr)
- && Objects.equals(this.serverPort, other.serverPort)
- && Objects.equals(this.p4DeviceId, other.p4DeviceId);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("deviceId", deviceId)
- .add("serverAddr", serverAddr)
- .add("serverPort", serverPort)
- .add("p4DeviceId", p4DeviceId)
- .toString();
- }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 821e744..dd968ce 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -23,7 +23,6 @@
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
-import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
@@ -32,9 +31,8 @@
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.onlab.osgi.DefaultServiceDirectory;
-import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
-import org.onosproject.net.DeviceId;
+import org.onosproject.grpc.ctl.AbstractGrpcClient;
import org.onosproject.net.pi.model.PiActionProfileId;
import org.onosproject.net.pi.model.PiCounterId;
import org.onosproject.net.pi.model.PiMeterId;
@@ -52,8 +50,8 @@
import org.onosproject.net.pi.runtime.PiTableEntry;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
-import org.slf4j.Logger;
import p4.config.v1.P4InfoOuterClass.P4Info;
import p4.tmp.P4Config;
import p4.v1.P4RuntimeGrpc;
@@ -87,22 +85,13 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.PACKET_REPLICATION_ENGINE_ENTRY;
@@ -115,10 +104,7 @@
/**
* Implementation of a P4Runtime client.
*/
-final class P4RuntimeClientImpl implements P4RuntimeClient {
-
- // Timeout in seconds to obtain the request lock.
- private static final int LOCK_TIMEOUT = 60;
+final class P4RuntimeClientImpl extends AbstractGrpcClient implements P4RuntimeClient {
private static final Metadata.Key<com.google.rpc.Status> STATUS_DETAILS_KEY =
Metadata.Key.of("grpc-status-details-bin",
@@ -132,18 +118,9 @@
WriteOperationType.DELETE, Update.Type.DELETE
);
- private final Logger log = getLogger(getClass());
-
- private final Lock requestLock = new ReentrantLock();
- private final Context.CancellableContext cancellableContext =
- Context.current().withCancellation();
-
- private final DeviceId deviceId;
private final long p4DeviceId;
private final P4RuntimeControllerImpl controller;
private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
- private final ExecutorService executorService;
- private final Executor contextExecutor;
private StreamChannelManager streamChannelManager;
// Used by this client for write requests.
@@ -154,70 +131,22 @@
/**
* Default constructor.
*
- * @param deviceId the ONOS device id
- * @param p4DeviceId the P4 device id
+ * @param clientKey the client key of this client
* @param channel gRPC channel
* @param controller runtime client controller
*/
- P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
+ P4RuntimeClientImpl(P4RuntimeClientKey clientKey, ManagedChannel channel,
P4RuntimeControllerImpl controller) {
- this.deviceId = deviceId;
- this.p4DeviceId = p4DeviceId;
+
+ super(clientKey, channel);
+ this.p4DeviceId = clientKey.p4DeviceId();
this.controller = controller;
- this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
- "onos-p4runtime-client-" + deviceId.toString(), "%d"));
- this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+
//TODO Investigate use of stub deadlines instead of timeout in supplyInContext
this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
this.streamChannelManager = new StreamChannelManager(channel);
}
- /**
- * Submits a task for async execution via the given executor. All tasks
- * submitted with this method will be executed sequentially.
- */
- private <U> CompletableFuture<U> supplyWithExecutor(
- Supplier<U> supplier, String opDescription, Executor executor) {
- return CompletableFuture.supplyAsync(() -> {
- // TODO: explore a more relaxed locking strategy.
- try {
- if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
- log.error("LOCK TIMEOUT! This is likely a deadlock, "
- + "please debug (executing {})",
- opDescription);
- throw new IllegalThreadStateException("Lock timeout");
- }
- } catch (InterruptedException e) {
- log.warn("Thread interrupted while waiting for lock (executing {})",
- opDescription);
- throw new IllegalStateException(e);
- }
- try {
- return supplier.get();
- } catch (StatusRuntimeException ex) {
- log.warn("Unable to execute {} on {}: {}",
- opDescription, deviceId, ex.toString());
- throw ex;
- } catch (Throwable ex) {
- log.error("Exception in client of {}, executing {}",
- deviceId, opDescription, ex);
- throw ex;
- } finally {
- requestLock.unlock();
- }
- }, executor);
- }
-
- /**
- * Equivalent of supplyWithExecutor using the gRPC context executor of this
- * client, such that if the context is cancelled (e.g. client shutdown) the
- * RPC is automatically cancelled.
- */
- private <U> CompletableFuture<U> supplyInContext(
- Supplier<U> supplier, String opDescription) {
- return supplyWithExecutor(supplier, opDescription, contextExecutor);
- }
-
@Override
public CompletableFuture<Boolean> startStreamChannel() {
return supplyInContext(() -> sendMasterArbitrationUpdate(false),
@@ -225,12 +154,6 @@
}
@Override
- public CompletableFuture<Void> shutdown() {
- return supplyWithExecutor(this::doShutdown, "shutdown",
- SharedExecutors.getPoolThreadExecutor());
- }
-
- @Override
public CompletableFuture<Boolean> becomeMaster() {
return supplyInContext(() -> sendMasterArbitrationUpdate(true),
"becomeMaster");
@@ -1154,19 +1077,9 @@
.build();
}
- private Void doShutdown() {
- log.debug("Shutting down client for {}...", deviceId);
+ protected Void doShutdown() {
streamChannelManager.complete();
- cancellableContext.cancel(new InterruptedException(
- "Requested client shutdown"));
- this.executorService.shutdownNow();
- try {
- executorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Executor service didn't shutdown in time.");
- Thread.currentThread().interrupt();
- }
- return null;
+ return super.doShutdown();
}
// Returns the collection of succesfully write entities.
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 953f1ff..a287653 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -19,16 +19,13 @@
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Striped;
import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.netty.NettyChannelBuilder;
-import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.api.GrpcController;
+import org.onosproject.grpc.ctl.AbstractGrpcClientController;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceAgentEvent;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.api.P4RuntimeEventListener;
@@ -40,14 +37,10 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import java.io.IOException;
import java.math.BigInteger;
-import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
-import java.util.function.Supplier;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
@@ -56,19 +49,12 @@
*/
@Component(immediate = true, service = P4RuntimeController.class)
public class P4RuntimeControllerImpl
- extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
+ extends AbstractGrpcClientController
+ <P4RuntimeClientKey, P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
implements P4RuntimeController {
- // Getting the pipeline config from the device can take tens of MBs.
- private static final int MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
- private static final int MEGABYTES = 1024 * 1024;
-
private final Logger log = getLogger(getClass());
- private final Map<DeviceId, ClientKey> clientKeys = Maps.newHashMap();
- private final Map<ClientKey, P4RuntimeClient> clients = Maps.newHashMap();
- private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
-
private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
deviceAgentListeners = Maps.newConcurrentMap();
private final Striped<Lock> stripedLocks = Striped.lock(30);
@@ -76,151 +62,28 @@
private DistributedElectionIdGenerator electionIdGenerator;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- private GrpcController grpcController;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
private StorageService storageService;
@Activate
public void activate() {
+ super.activate();
eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
electionIdGenerator = new DistributedElectionIdGenerator(storageService);
log.info("Started");
}
-
@Deactivate
public void deactivate() {
- clientKeys.keySet().forEach(this::removeClient);
- clientKeys.clear();
- clients.clear();
- channelIds.clear();
+ super.deactivate();
deviceAgentListeners.clear();
- grpcController = null;
electionIdGenerator.destroy();
electionIdGenerator = null;
- eventDispatcher.removeSink(P4RuntimeEvent.class);
log.info("Stopped");
}
@Override
- public boolean createClient(DeviceId deviceId, String serverAddr,
- int serverPort, long p4DeviceId) {
- checkNotNull(deviceId);
- checkNotNull(serverAddr);
- checkArgument(serverPort > 0, "Invalid server port");
-
- return withDeviceLock(() -> doCreateClient(
- deviceId, serverAddr, serverPort, p4DeviceId), deviceId);
- }
-
- private boolean doCreateClient(DeviceId deviceId, String serverAddr,
- int serverPort, long p4DeviceId) {
-
- ClientKey clientKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
-
- if (clientKeys.containsKey(deviceId)) {
- final ClientKey existingKey = clientKeys.get(deviceId);
- if (clientKey.equals(existingKey)) {
- log.debug("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
- deviceId, serverAddr, serverPort, p4DeviceId);
- return true;
- } else {
- log.info("Requested client for {} with new " +
- "endpoint, removing old client (server={}:{}, " +
- "p4DeviceId={})...",
- deviceId, existingKey.serverAddr(),
- existingKey.serverPort(), existingKey.p4DeviceId());
- doRemoveClient(deviceId);
- }
- }
-
- log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
- deviceId, serverAddr, serverPort, p4DeviceId);
-
- GrpcChannelId channelId = GrpcChannelId.of(
- clientKey.deviceId(), "p4runtime-" + clientKey);
-
- ManagedChannelBuilder channelBuilder = NettyChannelBuilder
- .forAddress(serverAddr, serverPort)
- .maxInboundMessageSize(MAX_INBOUND_MSG_SIZE * MEGABYTES)
- .usePlaintext();
-
- ManagedChannel channel;
- try {
- channel = grpcController.connectChannel(channelId, channelBuilder);
- } catch (IOException e) {
- log.warn("Unable to connect to gRPC server of {}: {}",
- clientKey.deviceId(), e.getMessage());
- return false;
- }
-
- P4RuntimeClient client = new P4RuntimeClientImpl(
- clientKey.deviceId(), clientKey.p4DeviceId(), channel, this);
-
- clientKeys.put(clientKey.deviceId(), clientKey);
- clients.put(clientKey, client);
- channelIds.put(clientKey.deviceId(), channelId);
-
- return true;
- }
-
- @Override
- public P4RuntimeClient getClient(DeviceId deviceId) {
- if (deviceId == null) {
- return null;
- }
- return withDeviceLock(() -> doGetClient(deviceId), deviceId);
- }
-
- private P4RuntimeClient doGetClient(DeviceId deviceId) {
- if (!clientKeys.containsKey(deviceId)) {
- return null;
- } else {
- return clients.get(clientKeys.get(deviceId));
- }
- }
-
- @Override
- public void removeClient(DeviceId deviceId) {
- if (deviceId == null) {
- return;
- }
- withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
- }
-
- private Void doRemoveClient(DeviceId deviceId) {
- if (clientKeys.containsKey(deviceId)) {
- final ClientKey clientKey = clientKeys.get(deviceId);
- clients.get(clientKey).shutdown();
- grpcController.disconnectChannel(channelIds.get(deviceId));
- clientKeys.remove(deviceId);
- clients.remove(clientKey);
- channelIds.remove(deviceId);
- }
- return null;
- }
-
- @Override
- public boolean hasClient(DeviceId deviceId) {
- return clientKeys.containsKey(deviceId);
- }
-
- @Override
- public boolean isReachable(DeviceId deviceId) {
- if (deviceId == null) {
- return false;
- }
- return withDeviceLock(() -> doIsReacheable(deviceId), deviceId);
- }
-
- private boolean doIsReacheable(DeviceId deviceId) {
- // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
- if (!clientKeys.containsKey(deviceId)) {
- log.debug("No client for {}, can't check for reachability", deviceId);
- return false;
- }
- return grpcController.isChannelOpen(channelIds.get(deviceId));
+ protected P4RuntimeClient createClientInstance(P4RuntimeClientKey clientKey, ManagedChannel channel) {
+ return new P4RuntimeClientImpl(clientKey, channel, this);
}
@Override
@@ -242,16 +105,6 @@
});
}
- private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
- final Lock lock = stripedLocks.get(deviceId);
- lock.lock();
- try {
- return task.get();
- } finally {
- lock.unlock();
- }
- }
-
BigInteger newMasterElectionId(DeviceId deviceId) {
return electionIdGenerator.generate(deviceId);
}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index 36e60ea..301d5e1 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -44,6 +44,7 @@
import org.onosproject.net.pi.runtime.PiActionGroupMember;
import org.onosproject.net.pi.runtime.PiActionGroupMemberId;
import org.onosproject.net.pi.runtime.PiActionParam;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
import p4.v1.P4RuntimeOuterClass.Entity;
@@ -100,6 +101,8 @@
private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
private static final long DEFAULT_TIMEOUT_TIME = 10;
private static final Uint128 DEFAULT_ELECTION_ID = Uint128.newBuilder().setLow(1).build();
+ private static final String P4R_IP = "127.0.0.1";
+ private static final int P4R_PORT = 50010;
private P4RuntimeClientImpl client;
private P4RuntimeControllerImpl controller;
@@ -152,9 +155,8 @@
@Before
public void setup() {
controller = niceMock(P4RuntimeControllerImpl.class);
- client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID,
- grpcChannel,
- controller);
+ P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, P4R_IP, P4R_PORT, P4_DEVICE_ID);
+ client = new P4RuntimeClientImpl(clientKey, grpcChannel, controller);
client.becomeMaster();
}
diff --git a/providers/p4runtime/packet/BUILD b/providers/p4runtime/packet/BUILD
index 8a83273..a1c3f5d 100644
--- a/providers/p4runtime/packet/BUILD
+++ b/providers/p4runtime/packet/BUILD
@@ -1,4 +1,5 @@
COMPILE_DEPS = CORE_DEPS + [
+ "//protocols/grpc/api:onos-protocols-grpc-api",
"//protocols/p4runtime/api:onos-protocols-p4runtime-api",
]