Send PIM Join/Prune messages based on events from the McastService.

Also change Interface to return a list of addresses rather than a set
to allow applications to rely on the order of configuration

Change-Id: Ie7f62fee507639325ee0a77b8db4088dae34597e
diff --git a/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterface.java b/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterface.java
index ce28908..1d09e35 100644
--- a/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterface.java
+++ b/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterface.java
@@ -22,6 +22,7 @@
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.PIM;
+import org.onlab.packet.pim.PIMAddrUnicast;
 import org.onlab.packet.pim.PIMHello;
 import org.onlab.packet.pim.PIMHelloOption;
 import org.onlab.packet.pim.PIMJoinPrune;
@@ -30,6 +31,7 @@
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.host.InterfaceIpAddress;
+import org.onosproject.net.mcast.McastRoute;
 import org.onosproject.net.packet.DefaultOutboundPacket;
 import org.onosproject.net.packet.PacketService;
 import org.slf4j.Logger;
@@ -37,9 +39,11 @@
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -55,6 +59,9 @@
 
     private final Logger log = getLogger(getClass());
 
+    private static final int JOIN_PERIOD = 60;
+    private static final double HOLD_TIME_MULTIPLIER = 3.5;
+
     private final PacketService packetService;
 
     private Interface onosInterface;
@@ -82,6 +89,8 @@
     // A map of all our PIM neighbors keyed on our neighbors IP address
     private Map<IpAddress, PIMNeighbor> pimNeighbors = new HashMap<>();
 
+    private Map<McastRoute, RouteData> routes = new ConcurrentHashMap<>();
+
     /**
      * Create a PIMInterface from an ONOS Interface.
      *
@@ -151,8 +160,8 @@
      *
      * @return a set of Ip Addresses on this interface
      */
-    public Set<InterfaceIpAddress> getIpAddresses() {
-        return onosInterface.ipAddresses();
+    public List<InterfaceIpAddress> getIpAddresses() {
+        return onosInterface.ipAddressesList();
     }
 
     /**
@@ -161,12 +170,12 @@
      * @return the choosen IP address or null if none
      */
     public IpAddress getIpAddress() {
-        if (onosInterface.ipAddresses().isEmpty()) {
+        if (onosInterface.ipAddressesList().isEmpty()) {
             return null;
         }
 
         IpAddress ipaddr = null;
-        for (InterfaceIpAddress ifipaddr : onosInterface.ipAddresses()) {
+        for (InterfaceIpAddress ifipaddr : onosInterface.ipAddressesList()) {
             ipaddr = ifipaddr.ipAddress();
             break;
         }
@@ -218,6 +227,10 @@
         return pimNeighbors.values();
     }
 
+    public Collection<McastRoute> getRoutes() {
+        return routes.keySet();
+    }
+
     /**
      * Checks whether any of our neighbors have expired, and cleans up their
      * state if they have.
@@ -402,6 +415,100 @@
 
     }
 
+    public void addRoute(McastRoute route, IpAddress nextHop, MacAddress nextHopMac) {
+        RouteData data = new RouteData(nextHop, nextHopMac);
+        routes.put(route, data);
+
+        sendJoinPrune(route, data, true);
+    }
+
+    public void removeRoute(McastRoute route) {
+        RouteData data = routes.remove(route);
+
+        if (data != null) {
+            sendJoinPrune(route, data, false);
+        }
+    }
+
+    public void sendJoins() {
+        routes.entrySet().forEach(entry -> {
+            if (entry.getValue().timestamp + TimeUnit.SECONDS.toMillis(JOIN_PERIOD) >
+                    System.currentTimeMillis()) {
+                return;
+            }
+
+            sendJoinPrune(entry.getKey(), entry.getValue(), true);
+        });
+    }
+
+    private void sendJoinPrune(McastRoute route, RouteData data, boolean join) {
+        PIMJoinPrune jp = new PIMJoinPrune();
+
+        jp.addJoinPrune(route.source().toIpPrefix(), route.group().toIpPrefix(), join);
+        jp.setHoldTime(join ? (short) Math.floor(JOIN_PERIOD * HOLD_TIME_MULTIPLIER) : 0);
+        jp.setUpstreamAddr(new PIMAddrUnicast(data.ipAddress.toString()));
+
+        PIM pim = new PIM();
+        pim.setPIMType(PIM.TYPE_JOIN_PRUNE_REQUEST);
+        pim.setPayload(jp);
+
+        IPv4 ipv4 = new IPv4();
+        ipv4.setDestinationAddress(PIM.PIM_ADDRESS.getIp4Address().toInt());
+        ipv4.setSourceAddress(getIpAddress().getIp4Address().toInt());
+        ipv4.setProtocol(IPv4.PROTOCOL_PIM);
+        ipv4.setTtl((byte) 1);
+        ipv4.setDiffServ((byte) 0xc0);
+        ipv4.setPayload(pim);
+
+        Ethernet eth = new Ethernet();
+        eth.setSourceMACAddress(onosInterface.mac());
+        eth.setDestinationMACAddress(MacAddress.valueOf("01:00:5E:00:00:0d"));
+        eth.setEtherType(Ethernet.TYPE_IPV4);
+        eth.setPayload(ipv4);
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setOutput(onosInterface.connectPoint().port())
+                .build();
+
+        packetService.emit(new DefaultOutboundPacket(onosInterface.connectPoint().deviceId(),
+                treatment, ByteBuffer.wrap(eth.serialize())));
+
+        data.timestamp = System.currentTimeMillis();
+    }
+
+    /*private void sendPrune(McastRoute route, RouteData data) {
+        PIMJoinPrune jp = new PIMJoinPrune();
+
+        jp.addJoinPrune(route.source().toIpPrefix(), route.group().toIpPrefix(), false);
+        jp.setHoldTime((short) 0);
+        jp.setUpstreamAddr(new PIMAddrUnicast(data.ipAddress.toString()));
+
+        PIM pim = new PIM();
+        pim.setPIMType(PIM.TYPE_JOIN_PRUNE_REQUEST);
+        pim.setPayload(jp);
+
+        IPv4 ipv4 = new IPv4();
+        ipv4.setDestinationAddress(PIM.PIM_ADDRESS.getIp4Address().toInt());
+        ipv4.setSourceAddress(getIpAddress().getIp4Address().toInt());
+        ipv4.setProtocol(IPv4.PROTOCOL_PIM);
+        ipv4.setTtl((byte) 1);
+        ipv4.setDiffServ((byte) 0xc0);
+        ipv4.setPayload(pim);
+
+        Ethernet eth = new Ethernet();
+        eth.setSourceMACAddress(onosInterface.mac());
+        eth.setDestinationMACAddress(MacAddress.valueOf("01:00:5E:00:00:0d"));
+        eth.setEtherType(Ethernet.TYPE_IPV4);
+        eth.setPayload(ipv4);
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setOutput(onosInterface.connectPoint().port())
+                .build();
+
+        packetService.emit(new DefaultOutboundPacket(onosInterface.connectPoint().deviceId(),
+                treatment, ByteBuffer.wrap(eth.serialize())));
+    }*/
+
     /**
      * Returns a builder for a PIM interface.
      *
@@ -514,4 +621,16 @@
         }
 
     }
+
+    private static class RouteData {
+        public final IpAddress ipAddress;
+        public final MacAddress macAddress;
+        public long timestamp;
+
+        public RouteData(IpAddress ip, MacAddress mac) {
+            this.ipAddress = ip;
+            this.macAddress = mac;
+            timestamp = System.currentTimeMillis();
+        }
+    }
 }
diff --git a/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterfaceManager.java b/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterfaceManager.java
index 9f56f10..342ef14 100644
--- a/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterfaceManager.java
+++ b/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterfaceManager.java
@@ -29,12 +29,22 @@
 import org.onosproject.incubator.net.intf.InterfaceListener;
 import org.onosproject.incubator.net.intf.InterfaceService;
 import org.onosproject.net.ConnectPoint;
+
+import org.onosproject.net.Host;
+
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigEvent;
 import org.onosproject.net.config.NetworkConfigListener;
 import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.host.HostService;
+import org.onosproject.net.mcast.McastEvent;
+import org.onosproject.net.mcast.McastListener;
+import org.onosproject.net.mcast.McastRoute;
+import org.onosproject.net.mcast.MulticastRouteService;
 import org.onosproject.net.packet.PacketService;
+import org.onosproject.routing.RouteEntry;
+import org.onosproject.routing.RoutingService;
 import org.slf4j.Logger;
 
 import java.util.Map;
@@ -73,6 +83,8 @@
 
     private final int timeoutTaskPeriod = DEFAULT_TASK_PERIOD_MS;
 
+    private final int joinTaskPeriod = 10000;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PacketService packetService;
 
@@ -82,13 +94,26 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected InterfaceService interfaceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected HostService hostService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MulticastRouteService multicastRouteService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected RoutingService unicastRoutingService;
+
     // Store PIM Interfaces in a map key'd by ConnectPoint
     private final Map<ConnectPoint, PIMInterface> pimInterfaces = Maps.newConcurrentMap();
 
+    private final Map<McastRoute, PIMInterface> routes = Maps.newConcurrentMap();
+
     private final InternalNetworkConfigListener configListener =
             new InternalNetworkConfigListener();
     private final InternalInterfaceListener interfaceListener =
             new InternalInterfaceListener();
+    private final InternalMulticastListener multicastListener =
+            new InternalMulticastListener();
 
     private final ConfigFactory<ConnectPoint, PimInterfaceConfig> pimConfigFactory
             = new ConfigFactory<ConnectPoint, PimInterfaceConfig>(
@@ -115,6 +140,9 @@
 
         networkConfig.addListener(configListener);
         interfaceService.addListener(interfaceListener);
+        multicastRouteService.addListener(multicastListener);
+
+        multicastRouteService.getRoutes().forEach(this::addRoute);
 
         // Schedule the periodic hello sender.
         scheduledExecutorService.scheduleAtFixedRate(
@@ -128,6 +156,11 @@
                         () -> pimInterfaces.values().forEach(PIMInterface::checkNeighborTimeouts)),
                 0, timeoutTaskPeriod, TimeUnit.MILLISECONDS);
 
+        scheduledExecutorService.scheduleAtFixedRate(
+                SafeRecurringTask.wrap(
+                        () -> pimInterfaces.values().forEach(PIMInterface::sendJoins)),
+                0, joinTaskPeriod, TimeUnit.MILLISECONDS);
+
         log.info("Started");
     }
 
@@ -135,6 +168,7 @@
     public void deactivate() {
         interfaceService.removeListener(interfaceListener);
         networkConfig.removeListener(configListener);
+        multicastRouteService.removeListener(multicastListener);
         networkConfig.unregisterConfigFactory(pimConfigFactory);
 
         // Shutdown the periodic hello task.
@@ -202,6 +236,65 @@
         return builder.build();
     }
 
+    private void addRoute(McastRoute route) {
+        PIMInterface pimInterface = getSourceInterface(route);
+
+        if (pimInterface == null) {
+            return;
+        }
+
+        routes.put(route, pimInterface);
+    }
+
+    private void removeRoute(McastRoute route) {
+        PIMInterface pimInterface = routes.remove(route);
+
+        if (pimInterface == null) {
+            return;
+        }
+
+        pimInterface.removeRoute(route);
+    }
+
+    private PIMInterface getSourceInterface(McastRoute route) {
+        RouteEntry routeEntry = unicastRoutingService.getLongestMatchableRouteEntry(route.source());
+
+        if (routeEntry == null) {
+            log.warn("No route to source {}", route.source());
+            return null;
+        }
+
+        Interface intf = interfaceService.getMatchingInterface(routeEntry.nextHop());
+
+        if (intf == null) {
+            log.warn("No interface with route to next hop {}", routeEntry.nextHop());
+            return null;
+        }
+
+        PIMInterface pimInterface = pimInterfaces.get(intf.connectPoint());
+
+        if (pimInterface == null) {
+            log.warn("PIM is not enabled on interface {}", intf);
+            return null;
+        }
+
+        Set<Host> hosts = hostService.getHostsByIp(routeEntry.nextHop());
+        Host host = null;
+        for (Host h : hosts) {
+            if (h.vlan().equals(intf.vlan())) {
+                host = h;
+            }
+        }
+        if (host == null) {
+            log.warn("Next hop host entry not found: {}", routeEntry.nextHop());
+            return null;
+        }
+
+        pimInterface.addRoute(route, routeEntry.nextHop(), host.mac());
+
+        return pimInterface;
+    }
+
     /**
      * Listener for network config events.
      */
@@ -261,4 +354,26 @@
             }
         }
     }
+
+    /**
+     * Listener for multicast route events.
+     */
+    private class InternalMulticastListener implements McastListener {
+        @Override
+        public void event(McastEvent event) {
+            switch (event.type()) {
+            case ROUTE_ADDED:
+                addRoute(event.subject().route());
+                break;
+            case ROUTE_REMOVED:
+                removeRoute(event.subject().route());
+                break;
+            case SOURCE_ADDED:
+            case SINK_ADDED:
+            case SINK_REMOVED:
+            default:
+                break;
+            }
+        }
+    }
 }