Inter-ONOS link discovery [ONOS-3459]

Enable the discovery of inter-domain links from the metro cluster using
fingerprinted link probes.

Change-Id: I52f2f0b2104456c3de77f35eefb97a1ab9ef65e6
diff --git a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java
index de18186..0d9e5c3 100644
--- a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java
+++ b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchDeviceProvider.java
@@ -16,7 +16,9 @@
 package org.onosproject.ecord.co;
 
 import org.apache.commons.lang3.tuple.Pair;
+
 import com.google.common.base.Strings;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -26,13 +28,19 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.glassfish.jersey.client.ClientProperties;
 import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.LLDPOrganizationalTLV;
+import org.onlab.packet.ONOSLLDP;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterMetadata;
+import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.incubator.rpc.RemoteServiceContext;
 import org.onosproject.incubator.rpc.RemoteServiceDirectory;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.Link;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.config.ConfigFactory;
@@ -46,9 +54,17 @@
 import org.onosproject.net.device.DeviceProviderRegistry;
 import org.onosproject.net.device.DeviceProviderService;
 import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.link.DefaultLinkDescription;
+import org.onosproject.net.link.LinkProvider;
+import org.onosproject.net.link.LinkProviderRegistry;
+import org.onosproject.net.link.LinkProviderService;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
 import org.onosproject.net.provider.ProviderId;
 import org.slf4j.Logger;
 import org.osgi.service.component.ComponentContext;
+import org.onosproject.net.packet.DefaultOutboundPacket;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -56,6 +72,7 @@
 import java.net.URI;
 import java.util.Map;
 import java.util.Optional;
+import java.nio.ByteBuffer;
 
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
@@ -63,11 +80,19 @@
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import java.util.Dictionary;
 
+import java.util.Dictionary;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.cluster.ClusterMetadata.NO_NAME;
 import static org.onosproject.ecord.co.BigSwitchManager.REALIZED_BY;
 import static org.onosproject.net.config.basics.SubjectFactories.CONNECT_POINT_SUBJECT_FACTORY;
 import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.net.flow.DefaultTrafficTreatment.builder;
 
 /* To configure the BigSwitchDevice parameters at startup, add configuration to tools/package/config/component-cfg.json
    before using onos-package command.
@@ -76,17 +101,19 @@
      "org.onosproject.ecord.co.BigSwitchDeviceProvider": {
        "providerScheme": "bigswitch",
        "providerId": "org.onosproject.bigswitch",
-       "remoteUri": "local://localhost",
-       "metroIp": "localhost"
+       "remoteUri": "grpc://192.168.64.100:11984",
+       "metroIp": "192.168.64.100"
      }
    }
+   Note that you need the port number (11984) and the metroIp and the remoteUri
+   values should point to the same IP/host.
  */
 
 /**
  * Device provider which exposes a big switch abstraction of the underlying data path.
  */
 @Component(immediate = true)
-public class BigSwitchDeviceProvider implements DeviceProvider {
+public class BigSwitchDeviceProvider implements DeviceProvider, LinkProvider {
 
     private static final Logger LOG = getLogger(BigSwitchDeviceProvider.class);
 
@@ -99,6 +126,9 @@
     private static final String PROP_METRO_IP = "metroIp";
     private static final String DEFAULT_METRO_IP = "localhost";
 
+    // make this an ONOSLLDP utility value?
+    private static final String SRC_MAC = "DE:AD:BE:EF:BA:11";
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected BigSwitchService bigSwitchService;
 
@@ -114,12 +144,29 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PacketService packetService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterMetadataService metadataService;
+
+    private RemoteServiceContext remoteServiceContext;
     private BigSwitch bigSwitch;
     private DeviceDescription bigSwitchDescription;
-    private DeviceProviderRegistry providerRegistry;
-    private DeviceProviderService providerService;
+    private DeviceProviderRegistry deviceProviderRegistry;
+    private DeviceProviderService deviceProviderService;
+    private LinkProviderRegistry linkProviderRegistry;
+    private LinkProviderService linkProviderService;
+    private ScheduledExecutorService executor;
+    private ScheduledFuture<?> future;
+    private String fingerPrint;
+
     private BigSwitchListener listener = new InternalListener();
 
+    private PacketProcessor packetProcessor = new InternalPacketProcessor();
+
+    private final Ethernet ethPacket = new Ethernet();
+
     private final ConfigFactory<ConnectPoint, CrossConnectConfig> xcConfigFactory
         = new ConfigFactory<ConnectPoint, CrossConnectConfig>(CONNECT_POINT_SUBJECT_FACTORY,
                                                     CrossConnectConfig.class,
@@ -154,22 +201,36 @@
         loadRpcConfig(context);
         loadRestConfig(context);
 
+        // setup service to, and register with, providers
+        try {
+            remoteServiceContext = rpcService.get(URI.create(remoteUri));
+        } catch (UnsupportedOperationException e) {
+            LOG.warn("Unsupported URI: {}", remoteUri);
+        }
+        providerId = new ProviderId(schemeProp, idProp);
         registerToDeviceProvider();
+        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/bigswitch", "discovery-%d"));
+        prepareProbe();
+        registerToLinkServices();
+
+        // start listening to config changes
         NetworkConfigListener cfglistener = new InternalConfigListener();
         cfgRegistry.addListener(cfglistener);
         cfgRegistry.registerConfigFactory(xcConfigFactory);
-
         LOG.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        packetService.removeProcessor(packetProcessor);
+
         cfgRegistry.unregisterConfigFactory(xcConfigFactory);
         cfgService.unregisterProperties(getClass(), false);
-
+        unregisterFromLinkServices();
+        executor.shutdownNow();
         unregisterFromDeviceProvider();
         // Won't hurt but necessary?
-        providerService = null;
+        deviceProviderService = null;
         providerId = null;
         LOG.info("Stopped");
     }
@@ -178,12 +239,18 @@
     public void modified(ComponentContext context) {
         // Needs re-registration to DeviceProvider
         if (loadRpcConfig(context)) {
-            // unregister from DeviceProvider with old parameters
+            // unregister from Device and Link Providers with old parameters
+            unregisterFromLinkServices();
             unregisterFromDeviceProvider();
-            // register to DeviceProvider with new parameters
-            registerToDeviceProvider();
-
-            LOG.info("Re-registered to DeviceProvider");
+            // register to Device and Link Providers with new parameters
+            try {
+                remoteServiceContext = rpcService.get(URI.create(remoteUri));
+                registerToDeviceProvider();
+                registerToLinkServices();
+            } catch (UnsupportedOperationException e) {
+                LOG.warn("Unsupported URI: {}", remoteUri);
+            }
+            LOG.info("Re-registered with Device and Link Providers");
         }
 
         // Needs to advertise cross-connect links
@@ -235,42 +302,48 @@
     }
 
     private void registerToDeviceProvider() {
-        RemoteServiceContext remoteServiceContext;
-        // TODO Can validation of the URI be done while loading configuration?
-        try {
-            remoteServiceContext = rpcService.get(URI.create(remoteUri));
-        } catch (UnsupportedOperationException e) {
-            LOG.error("Unsupported URI: {}", remoteUri);
-            return;
-        }
-
-        providerId = new ProviderId(schemeProp, idProp);
         // Create big switch device and description
         DeviceId deviceId = DeviceId.deviceId(schemeProp + ':' + clusterService.getLocalNode().ip());
         bigSwitch = new BigSwitch(deviceId, this.id());
         bigSwitchDescription = new DefaultDeviceDescription(bigSwitch.id().uri(),
                 bigSwitch.type(), bigSwitch.manufacturer(),
                 bigSwitch.hwVersion(), bigSwitch.swVersion(), bigSwitch.serialNumber(), bigSwitch.chassisId());
-        providerRegistry = remoteServiceContext.get(DeviceProviderRegistry.class);
-        providerService = providerRegistry.register(this);
+        deviceProviderRegistry = remoteServiceContext.get(DeviceProviderRegistry.class);
+        deviceProviderService = deviceProviderRegistry.register(this);
         // Start big switch service and register device
-        providerService.deviceConnected(bigSwitch.id(), bigSwitchDescription);
-        providerService.updatePorts(bigSwitch.id(), bigSwitchService.getPorts());
+        deviceProviderService.deviceConnected(bigSwitch.id(), bigSwitchDescription);
+        deviceProviderService.updatePorts(bigSwitch.id(), bigSwitchService.getPorts());
         advertiseCrossConnectLinksOnAllPorts();
         bigSwitchService.addListener(listener);
     }
 
+    private void registerToLinkServices() {
+        // Start link discovery -related functions
+        linkProviderRegistry = remoteServiceContext.get(LinkProviderRegistry.class);
+        linkProviderService = linkProviderRegistry.register(this);
+
+        future = executor.scheduleAtFixedRate(new DiscoveryTask(), 3, 3, TimeUnit.SECONDS);
+
+        // maybe also want a way to say 'get me next usable priority of class X'
+        packetService.addProcessor(packetProcessor, PacketProcessor.advisor(2));
+    }
+
     private void unregisterFromDeviceProvider() {
         if (bigSwitch == null) {
             LOG.warn("Invalid unregistration.");
             return;
         }
-        providerService.deviceDisconnected(bigSwitch.id());
-        providerRegistry.unregister(this);
-
+        deviceProviderService.deviceDisconnected(bigSwitch.id());
+        deviceProviderRegistry.unregister(this);
         bigSwitch = null;
     }
 
+    private void unregisterFromLinkServices() {
+        future.cancel(true);
+        packetService.removeProcessor(packetProcessor);
+        linkProviderRegistry.unregister(this);
+    }
+
     private ConnectPoint toConnectPoint(String strCp) {
         String[] split = strCp.split("/");
         if (split.length != 2) {
@@ -362,24 +435,31 @@
     }
 
     private void advertiseCrossConnectLinksOnAllPorts() {
-        bigSwitchService.getPorts().stream()
+        bigSwitchService.getPorts()
             .forEach(BigSwitchDeviceProvider.this::advertiseCrossConnectLinks);
     }
 
+    private void prepareProbe() {
+        ethPacket.setEtherType(Ethernet.TYPE_LLDP)
+                 .setDestinationMACAddress(ONOSLLDP.LLDP_NICIRA)
+                 .setSourceMACAddress(SRC_MAC)
+                 .setPad(true);
+    }
+
     private class InternalListener implements BigSwitchListener {
         @Override
         public void event(BigSwitchEvent event) {
             switch (event.type()) {
                 case PORT_ADDED:
                 case PORT_REMOVED:
-                    providerService.updatePorts(bigSwitch.id(), bigSwitchService.getPorts());
+                    deviceProviderService.updatePorts(bigSwitch.id(), bigSwitchService.getPorts());
                     // if the subject's underlying port was a cross connect port,
                     // advertise cross-connect link to Metro-ONOS view
                     advertiseCrossConnectLinks(event.subject());
                     break;
 
                 case PORT_UPDATED:
-                    providerService.portStatusChanged(bigSwitch.id(), event.subject());
+                    deviceProviderService.portStatusChanged(bigSwitch.id(), event.subject());
                     // if the subject's underlying port was a cross connect port,
                     // advertise cross-connect link to Metro-ONOS view
                     advertiseCrossConnectLinks(event.subject());
@@ -423,4 +503,97 @@
             }
         }
     }
+
+    /*
+     * Emits link probes tagged with the big switch's scheme from available
+     * virtual ports. This is run every three seconds.
+     */
+    private class DiscoveryTask implements Runnable {
+
+        @Override
+        public void run() {
+            // fingerprint shouldn't change so if we get it once, it can be stashed
+            if (fingerPrint == null || NO_NAME.equals(fingerPrint)) {
+                ClusterMetadata mData = metadataService.getClusterMetadata();
+                fingerPrint = mData == null ? NO_NAME : mData.getName();
+            }
+            bigSwitchService.getPorts().forEach(p -> {
+                // ID of big switch contains schema, so we're good
+                ONOSLLDP lldp = ONOSLLDP.fingerprintedLLDP(bigSwitch.id().toString(),
+                                                           bigSwitch.chassisId(),
+                                                           (int) p.portNumber().toLong(),
+                                                           fingerPrint);
+                ethPacket.setPayload(lldp);
+
+                // recover physical connect point
+                ConnectPoint real = ConnectPoint.deviceConnectPoint(
+                        p.annotations().value(BigSwitchManager.REALIZED_BY));
+                LOG.debug("sending probe for {}/{} through {}", bigSwitch.id(), p.portNumber(), real.toString());
+                packetService.emit(new DefaultOutboundPacket(real.deviceId(),
+                                                            builder().setOutput(real.port()).build(),
+                                                            ByteBuffer.wrap(ethPacket.serialize())));
+            });
+        }
+
+    }
+
+    private class InternalPacketProcessor implements PacketProcessor {
+
+        @Override
+        public void process(PacketContext context) {
+            Ethernet eth = context.inPacket().parsed();
+            if (eth == null) {
+                return;
+            }
+            ONOSLLDP probe = ONOSLLDP.parseONOSLLDP(eth);
+            if (probe == null) {
+                return;
+            }
+            if (isValidProbe(probe)) {
+                /*
+                 * build and pass a linkDesription to the metro domain, which
+                 * will map out a virtual link between the two big switches.
+                 */
+                PortNumber srcPort = PortNumber.portNumber(probe.getPort());
+                DeviceId srcDev = DeviceId.deviceId(probe.getDeviceString());
+                ConnectPoint src = new ConnectPoint(srcDev, srcPort);
+                // receiver-side: some assembly required.
+                PortNumber dstPort = bigSwitchService.getPort(context.inPacket().receivedFrom());
+                ConnectPoint dst = new ConnectPoint(bigSwitch.id(), dstPort);
+
+                LOG.debug("recvd link: {}->{}", src, dst);
+                linkProviderService.linkDetected(new DefaultLinkDescription(src, dst, Link.Type.VIRTUAL));
+            }
+        }
+
+        /*
+         * true for probes from other controller clusters, that are
+         * fingerprinted. Possibly also check that the other end's connect
+         * point is that of a big switch.
+         */
+        private boolean isValidProbe(ONOSLLDP probe) {
+            LLDPOrganizationalTLV tlv = probe.getDomainTLV();
+            if (tlv == null) {
+                return false;
+            }
+            ClusterMetadata mdata = metadataService.getClusterMetadata();
+            if (mdata == null) {
+                LOG.warn("In transient state, ignoring probes until I'm sane");
+                return false;
+            }
+            String us = mdata.getName();
+            String them = probe.getDomainString();
+            if (NO_NAME.equals(them)) {
+                LOG.warn("Cluster for {} in transient state; ignoring", probe.getDeviceString());
+                return false;
+            }
+            if (us.equals(them)) {
+                LOG.debug("Got LLDP from myself (name={})", us);
+                return false;
+            } else {
+                // only want probes from a virtualized domain edge, i.e. a bigswitch
+                return probe.getDeviceString().contains("bigswitch") ? true : false;
+            }
+        }
+    }
 }
diff --git a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java
index 948d7a9..64371ee 100644
--- a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java
+++ b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchManager.java
@@ -84,6 +84,7 @@
 
     // Holds the physical port to virtual port number mapping
     private ConsistentMap<ConnectPoint, Long> portMap;
+
     // Counter for virtual port numbers
     private AtomicCounter portCounter;
     // TODO: Add other listeners once we decide what an edge really is
@@ -123,6 +124,12 @@
                 .collect(Collectors.toList());
     }
 
+    @Override
+    public PortNumber getPort(ConnectPoint port) {
+        // XXX error-check and seriously think about a better method definition.
+        return PortNumber.portNumber(portMap.get(port).value());
+    }
+
     /**
      * Convert connect point to port description.
      *
diff --git a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchService.java b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchService.java
index 0997e61..291f238 100644
--- a/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchService.java
+++ b/ecord/co/src/main/java/org/onosproject/ecord/co/BigSwitchService.java
@@ -16,6 +16,8 @@
 package org.onosproject.ecord.co;
 
 import org.onosproject.event.ListenerService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.PortDescription;
 
 import java.util.List;
@@ -30,4 +32,12 @@
      * @return list of port descriptions
      */
     List<PortDescription> getPorts();
+
+    /**
+     * Get the big switch port mapped to the physical port.
+     *
+     * @param port the physical port
+     * @return virtual port number
+     */
+    PortNumber getPort(ConnectPoint port);
 }