Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2017-present Open Networking Foundation |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package org.onosproject.drivers.gnmi; |
| 18 | |
| 19 | import com.google.common.collect.ImmutableList; |
| 20 | import gnmi.gNMIGrpc; |
| 21 | import io.grpc.ManagedChannel; |
| 22 | import io.grpc.ManagedChannelBuilder; |
| 23 | import io.grpc.Status; |
| 24 | import io.grpc.StatusRuntimeException; |
Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 25 | import io.grpc.netty.NettyChannelBuilder; |
| 26 | import io.grpc.stub.StreamObserver; |
| 27 | import org.onosproject.grpc.api.GrpcChannelId; |
| 28 | import org.onosproject.grpc.api.GrpcController; |
| 29 | import org.onosproject.net.DefaultAnnotations; |
| 30 | import org.onosproject.net.DeviceId; |
| 31 | import org.onosproject.net.Port; |
| 32 | import org.onosproject.net.PortNumber; |
| 33 | import org.onosproject.net.device.DefaultPortDescription; |
| 34 | import org.onosproject.net.device.DeviceDescription; |
| 35 | import org.onosproject.net.device.DeviceDescriptionDiscovery; |
| 36 | import org.onosproject.net.device.PortDescription; |
| 37 | import org.onosproject.net.driver.AbstractHandlerBehaviour; |
| 38 | import org.slf4j.Logger; |
| 39 | import org.slf4j.LoggerFactory; |
| 40 | |
| 41 | import java.io.IOException; |
| 42 | import java.nio.charset.Charset; |
| 43 | import java.util.ArrayList; |
| 44 | import java.util.HashMap; |
| 45 | import java.util.List; |
| 46 | import java.util.Map; |
| 47 | import java.util.concurrent.CompletableFuture; |
| 48 | import java.util.concurrent.ExecutionException; |
| 49 | import java.util.concurrent.TimeUnit; |
| 50 | import java.util.concurrent.TimeoutException; |
| 51 | |
| 52 | import static gnmi.Gnmi.Path; |
| 53 | import static gnmi.Gnmi.PathElem; |
| 54 | import static gnmi.Gnmi.SubscribeRequest; |
| 55 | import static gnmi.Gnmi.SubscribeResponse; |
| 56 | import static gnmi.Gnmi.Subscription; |
| 57 | import static gnmi.Gnmi.SubscriptionList; |
| 58 | import static gnmi.Gnmi.Update; |
| 59 | |
| 60 | /** |
| 61 | * Class that discovers the device description and ports of a device that |
| 62 | * supports the gNMI protocol and Openconfig models. |
| 63 | */ |
| 64 | public class GnmiDeviceDescriptionDiscovery |
| 65 | extends AbstractHandlerBehaviour |
| 66 | implements DeviceDescriptionDiscovery { |
| 67 | |
| 68 | private static final int REQUEST_TIMEOUT_SECONDS = 5; |
| 69 | |
| 70 | private static final Logger log = LoggerFactory |
| 71 | .getLogger(GnmiDeviceDescriptionDiscovery.class); |
| 72 | |
| 73 | private static final String GNMI_SERVER_ADDR_KEY = "gnmi_ip"; |
| 74 | private static final String GNMI_SERVER_PORT_KEY = "gnmi_port"; |
| 75 | |
| 76 | @Override |
| 77 | public DeviceDescription discoverDeviceDetails() { |
| 78 | return null; |
| 79 | } |
| 80 | |
| 81 | @Override |
| 82 | public List<PortDescription> discoverPortDetails() { |
| 83 | log.info("Discovering port details on device {}", handler().data().deviceId()); |
| 84 | |
Andrea Campanella | de76c2c | 2018-01-31 19:06:49 +0100 | [diff] [blame] | 85 | String serverAddr = this.data().value(GNMI_SERVER_ADDR_KEY); |
| 86 | String serverPortString = this.data().value(GNMI_SERVER_PORT_KEY); |
| 87 | |
| 88 | if (serverAddr == null || serverPortString == null || |
| 89 | serverAddr.isEmpty() || serverPortString.isEmpty()) { |
| 90 | log.warn("gNMI server information not provided, can't discover ports"); |
| 91 | return ImmutableList.of(); |
| 92 | } |
| 93 | |
Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 94 | // Get the channel |
Andrea Campanella | de76c2c | 2018-01-31 19:06:49 +0100 | [diff] [blame] | 95 | ManagedChannel channel = getChannel(serverAddr, serverPortString); |
Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 96 | |
| 97 | if (channel == null) { |
| 98 | return ImmutableList.of(); |
| 99 | } |
| 100 | |
| 101 | // Build the subscribe request |
| 102 | SubscribeRequest request = subscribeRequest(); |
| 103 | |
| 104 | // New stub |
| 105 | gNMIGrpc.gNMIStub gnmiStub = gNMIGrpc.newStub(channel); |
| 106 | |
| 107 | final CompletableFuture<List<PortDescription>> |
| 108 | reply = new CompletableFuture<>(); |
| 109 | |
| 110 | // Subscribe to the replies |
| 111 | StreamObserver<SubscribeRequest> subscribeRequest = gnmiStub |
| 112 | .subscribe(new SubscribeResponseObserver(reply)); |
| 113 | log.debug("Interfaces request {}", request); |
| 114 | |
| 115 | List<PortDescription> ports; |
| 116 | try { |
| 117 | // Issue the request |
| 118 | subscribeRequest.onNext(request); |
| 119 | ports = reply.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
| 120 | } catch (InterruptedException | ExecutionException | TimeoutException |
| 121 | | StatusRuntimeException e) { |
| 122 | log.warn("Unable to discover ports from {}: {}", |
Andrea Campanella | de76c2c | 2018-01-31 19:06:49 +0100 | [diff] [blame] | 123 | data().deviceId(), e.getMessage()); |
Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 124 | log.debug("{}", e); |
| 125 | return ImmutableList.of(); |
| 126 | } finally { |
| 127 | subscribeRequest.onCompleted(); |
| 128 | } |
| 129 | |
| 130 | return ports; |
| 131 | } |
| 132 | |
| 133 | /** |
| 134 | * Obtains the ManagedChannel to be used for the communication. |
| 135 | * |
| 136 | * @return the managed channel |
| 137 | */ |
Andrea Campanella | de76c2c | 2018-01-31 19:06:49 +0100 | [diff] [blame] | 138 | private ManagedChannel getChannel(String serverAddr, String serverPortString) { |
Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 139 | |
| 140 | DeviceId deviceId = handler().data().deviceId(); |
Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 141 | |
| 142 | GrpcController controller = handler().get(GrpcController.class); |
| 143 | ManagedChannel channel = null; |
| 144 | |
| 145 | //FIXME can be optimized |
| 146 | //getting a channel if exists. |
| 147 | ManagedChannel managedChannel = controller |
| 148 | .getChannels(handler().data().deviceId()).stream().filter(c -> { |
| 149 | String[] authority = c.authority().split(":"); |
| 150 | String host = authority[0]; |
| 151 | String port = authority[1]; |
| 152 | return host.equals(serverAddr) && port.equals(serverPortString); |
| 153 | }).findAny().orElse(null); |
| 154 | |
| 155 | if (managedChannel != null) { |
| 156 | log.debug("Reusing Channel"); |
| 157 | channel = managedChannel; |
| 158 | } else { |
| 159 | log.debug("Creating Channel"); |
| 160 | GrpcChannelId newChannelId = GrpcChannelId.of(deviceId, "gnmi"); |
| 161 | |
| 162 | ManagedChannelBuilder channelBuilder = NettyChannelBuilder |
| 163 | .forAddress(serverAddr, Integer.valueOf(serverPortString)) |
Carmelo Cascone | 4164436 | 2018-08-09 16:56:43 -0700 | [diff] [blame] | 164 | .usePlaintext(true); |
Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 165 | |
| 166 | try { |
| 167 | channel = controller.connectChannel(newChannelId, channelBuilder); |
| 168 | } catch (IOException e) { |
| 169 | log.warn("Unable to connect to gRPC server of {}: {}", |
Andrea Campanella | de76c2c | 2018-01-31 19:06:49 +0100 | [diff] [blame] | 170 | deviceId, e.getMessage()); |
Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 171 | } |
| 172 | } |
| 173 | return channel; |
| 174 | } |
| 175 | |
| 176 | /** |
| 177 | * Creates the subscribe request for the interfaces. |
| 178 | * |
| 179 | * @return subscribe request |
| 180 | */ |
| 181 | private SubscribeRequest subscribeRequest() { |
| 182 | Path path = Path.newBuilder() |
| 183 | .addElem(PathElem.newBuilder().setName("interfaces").build()) |
| 184 | .addElem(PathElem.newBuilder().setName("interface").build()) |
| 185 | .addElem(PathElem.newBuilder().setName("...").build()) |
| 186 | .build(); |
| 187 | Subscription subscription = Subscription.newBuilder().setPath(path).build(); |
| 188 | SubscriptionList list = SubscriptionList.newBuilder().setMode(SubscriptionList.Mode.ONCE) |
| 189 | .addSubscription(subscription).build(); |
| 190 | return SubscribeRequest.newBuilder().setSubscribe(list).build(); |
| 191 | } |
| 192 | |
| 193 | /** |
| 194 | * Handles messages received from the device on the stream channel. |
| 195 | */ |
| 196 | private final class SubscribeResponseObserver |
| 197 | implements StreamObserver<SubscribeResponse> { |
| 198 | |
| 199 | private final CompletableFuture<List<PortDescription>> reply; |
| 200 | |
| 201 | private SubscribeResponseObserver(CompletableFuture<List<PortDescription>> reply) { |
| 202 | this.reply = reply; |
| 203 | } |
| 204 | |
| 205 | @Override |
| 206 | public void onNext(SubscribeResponse message) { |
| 207 | Map<String, DefaultPortDescription.Builder> ports = new HashMap<>(); |
| 208 | Map<String, DefaultAnnotations.Builder> portsAnnotations = new HashMap<>(); |
| 209 | log.debug("Response {} ", message.getUpdate().toString()); |
| 210 | message.getUpdate().getUpdateList().forEach(update -> { |
| 211 | parseUpdate(ports, portsAnnotations, update); |
| 212 | }); |
| 213 | |
| 214 | List<PortDescription> portDescriptionList = new ArrayList<>(); |
| 215 | ports.forEach((k, v) -> { |
| 216 | // v.portSpeed(1000L); |
| 217 | v.type(Port.Type.COPPER); |
| 218 | v.annotations(portsAnnotations.get(k).set("name", k).build()); |
| 219 | portDescriptionList.add(v.build()); |
| 220 | }); |
| 221 | |
| 222 | reply.complete(portDescriptionList); |
| 223 | } |
| 224 | |
| 225 | |
| 226 | @Override |
| 227 | public void onError(Throwable throwable) { |
| 228 | log.warn("Error on stream channel for {}: {}", |
Andrea Campanella | de76c2c | 2018-01-31 19:06:49 +0100 | [diff] [blame] | 229 | data().deviceId(), Status.fromThrowable(throwable)); |
Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 230 | log.debug("{}", throwable); |
| 231 | } |
| 232 | |
| 233 | @Override |
| 234 | public void onCompleted() { |
| 235 | log.debug("SubscribeResponseObserver completed"); |
| 236 | } |
| 237 | } |
| 238 | |
| 239 | /** |
| 240 | * Parses the update received from the device. |
| 241 | * |
| 242 | * @param ports the ports description to build |
| 243 | * @param portsAnnotations the ports annotations list to populate |
| 244 | * @param update the update received |
| 245 | */ |
| 246 | private void parseUpdate(Map<String, DefaultPortDescription.Builder> ports, |
| 247 | Map<String, DefaultAnnotations.Builder> portsAnnotations, |
| 248 | Update update) { |
| 249 | |
| 250 | //FIXME crude parsing, can be done via object (de)serialization |
| 251 | if (update.getPath().getElemList().size() > 3) { |
| 252 | String name = update.getPath().getElem(3).getName(); |
| 253 | String portId = update.getPath().getElem(1).getKeyMap().get("name"); |
| 254 | if (!ports.containsKey(portId)) { |
| 255 | int number = Character.getNumericValue(portId.charAt(portId.length() - 1)); |
| 256 | PortNumber portNumber = PortNumber.portNumber(number, portId); |
| 257 | ports.put(portId, DefaultPortDescription.builder() |
Yuta HIGUCHI | 53e4796 | 2018-03-01 23:50:48 -0800 | [diff] [blame] | 258 | .withPortNumber(portNumber)); |
Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 259 | } |
| 260 | if (name.equals("enabled")) { |
| 261 | DefaultPortDescription.Builder builder = ports.get(portId); |
| 262 | builder = builder.isEnabled(update.getVal().getBoolVal()); |
| 263 | ports.put(portId, builder); |
| 264 | } else if (name.equals("state")) { |
| 265 | String speedName = update.getPath().getElem(4).getName(); |
| 266 | if (speedName.equals("negotiated-port-speed")) { |
| 267 | DefaultPortDescription.Builder builder = ports.get(portId); |
| 268 | long speed = parsePortSpeed(update.getVal().getStringVal()); |
| 269 | builder = builder.portSpeed(speed); |
| 270 | ports.put(portId, builder); |
| 271 | } |
| 272 | } else if (!name.equals("ifindex")) { |
| 273 | if (!portsAnnotations.containsKey(portId)) { |
| 274 | portsAnnotations.put(portId, DefaultAnnotations.builder() |
| 275 | .set(name, update.getVal().toByteString() |
| 276 | .toString(Charset.defaultCharset()).trim())); |
| 277 | } else { |
| 278 | DefaultAnnotations.Builder builder = portsAnnotations.get(portId); |
| 279 | builder = builder.set(name, update.getVal().toByteString(). |
| 280 | toString(Charset.defaultCharset()).trim()); |
| 281 | portsAnnotations.put(portId, builder); |
| 282 | } |
| 283 | } |
| 284 | } |
| 285 | } |
| 286 | |
| 287 | private long parsePortSpeed(String speed) { |
| 288 | log.debug("Speed from config {}", speed); |
| 289 | switch (speed) { |
| 290 | case "SPEED_10MB": |
| 291 | return 10; |
| 292 | case "SPEED_100MB": |
柯志勇10068695 | f66a64f | 2018-10-18 19:26:53 +0800 | [diff] [blame] | 293 | return 100; |
Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame] | 294 | case "SPEED_1GB": |
| 295 | return 1000; |
| 296 | case "SPEED_10GB": |
| 297 | return 10000; |
| 298 | case "SPEED_25GB": |
| 299 | return 25000; |
| 300 | case "SPEED_40GB": |
| 301 | return 40000; |
| 302 | case "SPEED_50GB": |
| 303 | return 50000; |
| 304 | case "SPEED_100GB": |
| 305 | return 100000; |
| 306 | default: |
| 307 | return 1000; |
| 308 | } |
| 309 | } |
| 310 | } |