Andrea Campanella | bf9e5ce | 2017-12-06 14:26:36 +0100 | [diff] [blame^] | 1 | /* |
| 2 | * Copyright 2017-present Open Networking Foundation |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package org.onosproject.drivers.gnmi; |
| 18 | |
| 19 | import com.google.common.collect.ImmutableList; |
| 20 | import gnmi.gNMIGrpc; |
| 21 | import io.grpc.ManagedChannel; |
| 22 | import io.grpc.ManagedChannelBuilder; |
| 23 | import io.grpc.Status; |
| 24 | import io.grpc.StatusRuntimeException; |
| 25 | import io.grpc.internal.DnsNameResolverProvider; |
| 26 | import io.grpc.netty.NettyChannelBuilder; |
| 27 | import io.grpc.stub.StreamObserver; |
| 28 | import org.onosproject.grpc.api.GrpcChannelId; |
| 29 | import org.onosproject.grpc.api.GrpcController; |
| 30 | import org.onosproject.net.DefaultAnnotations; |
| 31 | import org.onosproject.net.DeviceId; |
| 32 | import org.onosproject.net.Port; |
| 33 | import org.onosproject.net.PortNumber; |
| 34 | import org.onosproject.net.device.DefaultPortDescription; |
| 35 | import org.onosproject.net.device.DeviceDescription; |
| 36 | import org.onosproject.net.device.DeviceDescriptionDiscovery; |
| 37 | import org.onosproject.net.device.PortDescription; |
| 38 | import org.onosproject.net.driver.AbstractHandlerBehaviour; |
| 39 | import org.slf4j.Logger; |
| 40 | import org.slf4j.LoggerFactory; |
| 41 | |
| 42 | import java.io.IOException; |
| 43 | import java.nio.charset.Charset; |
| 44 | import java.util.ArrayList; |
| 45 | import java.util.HashMap; |
| 46 | import java.util.List; |
| 47 | import java.util.Map; |
| 48 | import java.util.concurrent.CompletableFuture; |
| 49 | import java.util.concurrent.ExecutionException; |
| 50 | import java.util.concurrent.TimeUnit; |
| 51 | import java.util.concurrent.TimeoutException; |
| 52 | |
| 53 | import static gnmi.Gnmi.Path; |
| 54 | import static gnmi.Gnmi.PathElem; |
| 55 | import static gnmi.Gnmi.SubscribeRequest; |
| 56 | import static gnmi.Gnmi.SubscribeResponse; |
| 57 | import static gnmi.Gnmi.Subscription; |
| 58 | import static gnmi.Gnmi.SubscriptionList; |
| 59 | import static gnmi.Gnmi.Update; |
| 60 | |
| 61 | /** |
| 62 | * Class that discovers the device description and ports of a device that |
| 63 | * supports the gNMI protocol and Openconfig models. |
| 64 | */ |
| 65 | public class GnmiDeviceDescriptionDiscovery |
| 66 | extends AbstractHandlerBehaviour |
| 67 | implements DeviceDescriptionDiscovery { |
| 68 | |
| 69 | private static final int REQUEST_TIMEOUT_SECONDS = 5; |
| 70 | |
| 71 | private static final Logger log = LoggerFactory |
| 72 | .getLogger(GnmiDeviceDescriptionDiscovery.class); |
| 73 | |
| 74 | private static final String GNMI_SERVER_ADDR_KEY = "gnmi_ip"; |
| 75 | private static final String GNMI_SERVER_PORT_KEY = "gnmi_port"; |
| 76 | |
| 77 | @Override |
| 78 | public DeviceDescription discoverDeviceDetails() { |
| 79 | return null; |
| 80 | } |
| 81 | |
| 82 | @Override |
| 83 | public List<PortDescription> discoverPortDetails() { |
| 84 | log.info("Discovering port details on device {}", handler().data().deviceId()); |
| 85 | |
| 86 | // Get the channel |
| 87 | ManagedChannel channel = getChannel(); |
| 88 | |
| 89 | if (channel == null) { |
| 90 | return ImmutableList.of(); |
| 91 | } |
| 92 | |
| 93 | // Build the subscribe request |
| 94 | SubscribeRequest request = subscribeRequest(); |
| 95 | |
| 96 | // New stub |
| 97 | gNMIGrpc.gNMIStub gnmiStub = gNMIGrpc.newStub(channel); |
| 98 | |
| 99 | final CompletableFuture<List<PortDescription>> |
| 100 | reply = new CompletableFuture<>(); |
| 101 | |
| 102 | // Subscribe to the replies |
| 103 | StreamObserver<SubscribeRequest> subscribeRequest = gnmiStub |
| 104 | .subscribe(new SubscribeResponseObserver(reply)); |
| 105 | log.debug("Interfaces request {}", request); |
| 106 | |
| 107 | List<PortDescription> ports; |
| 108 | try { |
| 109 | // Issue the request |
| 110 | subscribeRequest.onNext(request); |
| 111 | ports = reply.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
| 112 | } catch (InterruptedException | ExecutionException | TimeoutException |
| 113 | | StatusRuntimeException e) { |
| 114 | log.warn("Unable to discover ports from {}: {}", |
| 115 | data().deviceId(), e.getMessage()); |
| 116 | log.debug("{}", e); |
| 117 | return ImmutableList.of(); |
| 118 | } finally { |
| 119 | subscribeRequest.onCompleted(); |
| 120 | } |
| 121 | |
| 122 | return ports; |
| 123 | } |
| 124 | |
| 125 | /** |
| 126 | * Obtains the ManagedChannel to be used for the communication. |
| 127 | * |
| 128 | * @return the managed channel |
| 129 | */ |
| 130 | private ManagedChannel getChannel() { |
| 131 | |
| 132 | DeviceId deviceId = handler().data().deviceId(); |
| 133 | String serverAddr = this.data().value(GNMI_SERVER_ADDR_KEY); |
| 134 | String serverPortString = this.data().value(GNMI_SERVER_PORT_KEY); |
| 135 | |
| 136 | GrpcController controller = handler().get(GrpcController.class); |
| 137 | ManagedChannel channel = null; |
| 138 | |
| 139 | //FIXME can be optimized |
| 140 | //getting a channel if exists. |
| 141 | ManagedChannel managedChannel = controller |
| 142 | .getChannels(handler().data().deviceId()).stream().filter(c -> { |
| 143 | String[] authority = c.authority().split(":"); |
| 144 | String host = authority[0]; |
| 145 | String port = authority[1]; |
| 146 | return host.equals(serverAddr) && port.equals(serverPortString); |
| 147 | }).findAny().orElse(null); |
| 148 | |
| 149 | if (managedChannel != null) { |
| 150 | log.debug("Reusing Channel"); |
| 151 | channel = managedChannel; |
| 152 | } else { |
| 153 | log.debug("Creating Channel"); |
| 154 | GrpcChannelId newChannelId = GrpcChannelId.of(deviceId, "gnmi"); |
| 155 | |
| 156 | ManagedChannelBuilder channelBuilder = NettyChannelBuilder |
| 157 | .forAddress(serverAddr, Integer.valueOf(serverPortString)) |
| 158 | .usePlaintext(true) |
| 159 | .nameResolverFactory(new DnsNameResolverProvider()); |
| 160 | |
| 161 | try { |
| 162 | channel = controller.connectChannel(newChannelId, channelBuilder); |
| 163 | } catch (IOException e) { |
| 164 | log.warn("Unable to connect to gRPC server of {}: {}", |
| 165 | deviceId, e.getMessage()); |
| 166 | } |
| 167 | } |
| 168 | return channel; |
| 169 | } |
| 170 | |
| 171 | /** |
| 172 | * Creates the subscribe request for the interfaces. |
| 173 | * |
| 174 | * @return subscribe request |
| 175 | */ |
| 176 | private SubscribeRequest subscribeRequest() { |
| 177 | Path path = Path.newBuilder() |
| 178 | .addElem(PathElem.newBuilder().setName("interfaces").build()) |
| 179 | .addElem(PathElem.newBuilder().setName("interface").build()) |
| 180 | .addElem(PathElem.newBuilder().setName("...").build()) |
| 181 | .build(); |
| 182 | Subscription subscription = Subscription.newBuilder().setPath(path).build(); |
| 183 | SubscriptionList list = SubscriptionList.newBuilder().setMode(SubscriptionList.Mode.ONCE) |
| 184 | .addSubscription(subscription).build(); |
| 185 | return SubscribeRequest.newBuilder().setSubscribe(list).build(); |
| 186 | } |
| 187 | |
| 188 | /** |
| 189 | * Handles messages received from the device on the stream channel. |
| 190 | */ |
| 191 | private final class SubscribeResponseObserver |
| 192 | implements StreamObserver<SubscribeResponse> { |
| 193 | |
| 194 | private final CompletableFuture<List<PortDescription>> reply; |
| 195 | |
| 196 | private SubscribeResponseObserver(CompletableFuture<List<PortDescription>> reply) { |
| 197 | this.reply = reply; |
| 198 | } |
| 199 | |
| 200 | @Override |
| 201 | public void onNext(SubscribeResponse message) { |
| 202 | Map<String, DefaultPortDescription.Builder> ports = new HashMap<>(); |
| 203 | Map<String, DefaultAnnotations.Builder> portsAnnotations = new HashMap<>(); |
| 204 | log.debug("Response {} ", message.getUpdate().toString()); |
| 205 | message.getUpdate().getUpdateList().forEach(update -> { |
| 206 | parseUpdate(ports, portsAnnotations, update); |
| 207 | }); |
| 208 | |
| 209 | List<PortDescription> portDescriptionList = new ArrayList<>(); |
| 210 | ports.forEach((k, v) -> { |
| 211 | // v.portSpeed(1000L); |
| 212 | v.type(Port.Type.COPPER); |
| 213 | v.annotations(portsAnnotations.get(k).set("name", k).build()); |
| 214 | portDescriptionList.add(v.build()); |
| 215 | }); |
| 216 | |
| 217 | reply.complete(portDescriptionList); |
| 218 | } |
| 219 | |
| 220 | |
| 221 | @Override |
| 222 | public void onError(Throwable throwable) { |
| 223 | log.warn("Error on stream channel for {}: {}", |
| 224 | data().deviceId(), Status.fromThrowable(throwable)); |
| 225 | log.debug("{}", throwable); |
| 226 | } |
| 227 | |
| 228 | @Override |
| 229 | public void onCompleted() { |
| 230 | log.debug("SubscribeResponseObserver completed"); |
| 231 | } |
| 232 | } |
| 233 | |
| 234 | /** |
| 235 | * Parses the update received from the device. |
| 236 | * |
| 237 | * @param ports the ports description to build |
| 238 | * @param portsAnnotations the ports annotations list to populate |
| 239 | * @param update the update received |
| 240 | */ |
| 241 | private void parseUpdate(Map<String, DefaultPortDescription.Builder> ports, |
| 242 | Map<String, DefaultAnnotations.Builder> portsAnnotations, |
| 243 | Update update) { |
| 244 | |
| 245 | //FIXME crude parsing, can be done via object (de)serialization |
| 246 | if (update.getPath().getElemList().size() > 3) { |
| 247 | String name = update.getPath().getElem(3).getName(); |
| 248 | String portId = update.getPath().getElem(1).getKeyMap().get("name"); |
| 249 | if (!ports.containsKey(portId)) { |
| 250 | int number = Character.getNumericValue(portId.charAt(portId.length() - 1)); |
| 251 | PortNumber portNumber = PortNumber.portNumber(number, portId); |
| 252 | ports.put(portId, DefaultPortDescription.builder() |
| 253 | .withPortNumer(portNumber)); |
| 254 | } |
| 255 | if (name.equals("enabled")) { |
| 256 | DefaultPortDescription.Builder builder = ports.get(portId); |
| 257 | builder = builder.isEnabled(update.getVal().getBoolVal()); |
| 258 | ports.put(portId, builder); |
| 259 | } else if (name.equals("state")) { |
| 260 | String speedName = update.getPath().getElem(4).getName(); |
| 261 | if (speedName.equals("negotiated-port-speed")) { |
| 262 | DefaultPortDescription.Builder builder = ports.get(portId); |
| 263 | long speed = parsePortSpeed(update.getVal().getStringVal()); |
| 264 | builder = builder.portSpeed(speed); |
| 265 | ports.put(portId, builder); |
| 266 | } |
| 267 | } else if (!name.equals("ifindex")) { |
| 268 | if (!portsAnnotations.containsKey(portId)) { |
| 269 | portsAnnotations.put(portId, DefaultAnnotations.builder() |
| 270 | .set(name, update.getVal().toByteString() |
| 271 | .toString(Charset.defaultCharset()).trim())); |
| 272 | } else { |
| 273 | DefaultAnnotations.Builder builder = portsAnnotations.get(portId); |
| 274 | builder = builder.set(name, update.getVal().toByteString(). |
| 275 | toString(Charset.defaultCharset()).trim()); |
| 276 | portsAnnotations.put(portId, builder); |
| 277 | } |
| 278 | } |
| 279 | } |
| 280 | } |
| 281 | |
| 282 | private long parsePortSpeed(String speed) { |
| 283 | log.debug("Speed from config {}", speed); |
| 284 | switch (speed) { |
| 285 | case "SPEED_10MB": |
| 286 | return 10; |
| 287 | case "SPEED_100MB": |
| 288 | return 10; |
| 289 | case "SPEED_1GB": |
| 290 | return 1000; |
| 291 | case "SPEED_10GB": |
| 292 | return 10000; |
| 293 | case "SPEED_25GB": |
| 294 | return 25000; |
| 295 | case "SPEED_40GB": |
| 296 | return 40000; |
| 297 | case "SPEED_50GB": |
| 298 | return 50000; |
| 299 | case "SPEED_100GB": |
| 300 | return 100000; |
| 301 | default: |
| 302 | return 1000; |
| 303 | } |
| 304 | } |
| 305 | } |