Send PIM Join/Prune messages based on events from the McastService.
Also change Interface to return a list of addresses rather than a set
to allow applications to rely on the order of configuration
Change-Id: Ie7f62fee507639325ee0a77b8db4088dae34597e
diff --git a/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterface.java b/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterface.java
index ce28908..1d09e35 100644
--- a/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterface.java
+++ b/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterface.java
@@ -22,6 +22,7 @@
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.PIM;
+import org.onlab.packet.pim.PIMAddrUnicast;
import org.onlab.packet.pim.PIMHello;
import org.onlab.packet.pim.PIMHelloOption;
import org.onlab.packet.pim.PIMJoinPrune;
@@ -30,6 +31,7 @@
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.host.InterfaceIpAddress;
+import org.onosproject.net.mcast.McastRoute;
import org.onosproject.net.packet.DefaultOutboundPacket;
import org.onosproject.net.packet.PacketService;
import org.slf4j.Logger;
@@ -37,9 +39,11 @@
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -55,6 +59,9 @@
private final Logger log = getLogger(getClass());
+ private static final int JOIN_PERIOD = 60;
+ private static final double HOLD_TIME_MULTIPLIER = 3.5;
+
private final PacketService packetService;
private Interface onosInterface;
@@ -82,6 +89,8 @@
// A map of all our PIM neighbors keyed on our neighbors IP address
private Map<IpAddress, PIMNeighbor> pimNeighbors = new HashMap<>();
+ private Map<McastRoute, RouteData> routes = new ConcurrentHashMap<>();
+
/**
* Create a PIMInterface from an ONOS Interface.
*
@@ -151,8 +160,8 @@
*
* @return a set of Ip Addresses on this interface
*/
- public Set<InterfaceIpAddress> getIpAddresses() {
- return onosInterface.ipAddresses();
+ public List<InterfaceIpAddress> getIpAddresses() {
+ return onosInterface.ipAddressesList();
}
/**
@@ -161,12 +170,12 @@
* @return the choosen IP address or null if none
*/
public IpAddress getIpAddress() {
- if (onosInterface.ipAddresses().isEmpty()) {
+ if (onosInterface.ipAddressesList().isEmpty()) {
return null;
}
IpAddress ipaddr = null;
- for (InterfaceIpAddress ifipaddr : onosInterface.ipAddresses()) {
+ for (InterfaceIpAddress ifipaddr : onosInterface.ipAddressesList()) {
ipaddr = ifipaddr.ipAddress();
break;
}
@@ -218,6 +227,10 @@
return pimNeighbors.values();
}
+ public Collection<McastRoute> getRoutes() {
+ return routes.keySet();
+ }
+
/**
* Checks whether any of our neighbors have expired, and cleans up their
* state if they have.
@@ -402,6 +415,100 @@
}
+ public void addRoute(McastRoute route, IpAddress nextHop, MacAddress nextHopMac) {
+ RouteData data = new RouteData(nextHop, nextHopMac);
+ routes.put(route, data);
+
+ sendJoinPrune(route, data, true);
+ }
+
+ public void removeRoute(McastRoute route) {
+ RouteData data = routes.remove(route);
+
+ if (data != null) {
+ sendJoinPrune(route, data, false);
+ }
+ }
+
+ public void sendJoins() {
+ routes.entrySet().forEach(entry -> {
+ if (entry.getValue().timestamp + TimeUnit.SECONDS.toMillis(JOIN_PERIOD) >
+ System.currentTimeMillis()) {
+ return;
+ }
+
+ sendJoinPrune(entry.getKey(), entry.getValue(), true);
+ });
+ }
+
+ private void sendJoinPrune(McastRoute route, RouteData data, boolean join) {
+ PIMJoinPrune jp = new PIMJoinPrune();
+
+ jp.addJoinPrune(route.source().toIpPrefix(), route.group().toIpPrefix(), join);
+ jp.setHoldTime(join ? (short) Math.floor(JOIN_PERIOD * HOLD_TIME_MULTIPLIER) : 0);
+ jp.setUpstreamAddr(new PIMAddrUnicast(data.ipAddress.toString()));
+
+ PIM pim = new PIM();
+ pim.setPIMType(PIM.TYPE_JOIN_PRUNE_REQUEST);
+ pim.setPayload(jp);
+
+ IPv4 ipv4 = new IPv4();
+ ipv4.setDestinationAddress(PIM.PIM_ADDRESS.getIp4Address().toInt());
+ ipv4.setSourceAddress(getIpAddress().getIp4Address().toInt());
+ ipv4.setProtocol(IPv4.PROTOCOL_PIM);
+ ipv4.setTtl((byte) 1);
+ ipv4.setDiffServ((byte) 0xc0);
+ ipv4.setPayload(pim);
+
+ Ethernet eth = new Ethernet();
+ eth.setSourceMACAddress(onosInterface.mac());
+ eth.setDestinationMACAddress(MacAddress.valueOf("01:00:5E:00:00:0d"));
+ eth.setEtherType(Ethernet.TYPE_IPV4);
+ eth.setPayload(ipv4);
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .setOutput(onosInterface.connectPoint().port())
+ .build();
+
+ packetService.emit(new DefaultOutboundPacket(onosInterface.connectPoint().deviceId(),
+ treatment, ByteBuffer.wrap(eth.serialize())));
+
+ data.timestamp = System.currentTimeMillis();
+ }
+
+ /*private void sendPrune(McastRoute route, RouteData data) {
+ PIMJoinPrune jp = new PIMJoinPrune();
+
+ jp.addJoinPrune(route.source().toIpPrefix(), route.group().toIpPrefix(), false);
+ jp.setHoldTime((short) 0);
+ jp.setUpstreamAddr(new PIMAddrUnicast(data.ipAddress.toString()));
+
+ PIM pim = new PIM();
+ pim.setPIMType(PIM.TYPE_JOIN_PRUNE_REQUEST);
+ pim.setPayload(jp);
+
+ IPv4 ipv4 = new IPv4();
+ ipv4.setDestinationAddress(PIM.PIM_ADDRESS.getIp4Address().toInt());
+ ipv4.setSourceAddress(getIpAddress().getIp4Address().toInt());
+ ipv4.setProtocol(IPv4.PROTOCOL_PIM);
+ ipv4.setTtl((byte) 1);
+ ipv4.setDiffServ((byte) 0xc0);
+ ipv4.setPayload(pim);
+
+ Ethernet eth = new Ethernet();
+ eth.setSourceMACAddress(onosInterface.mac());
+ eth.setDestinationMACAddress(MacAddress.valueOf("01:00:5E:00:00:0d"));
+ eth.setEtherType(Ethernet.TYPE_IPV4);
+ eth.setPayload(ipv4);
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .setOutput(onosInterface.connectPoint().port())
+ .build();
+
+ packetService.emit(new DefaultOutboundPacket(onosInterface.connectPoint().deviceId(),
+ treatment, ByteBuffer.wrap(eth.serialize())));
+ }*/
+
/**
* Returns a builder for a PIM interface.
*
@@ -514,4 +621,16 @@
}
}
+
+ private static class RouteData {
+ public final IpAddress ipAddress;
+ public final MacAddress macAddress;
+ public long timestamp;
+
+ public RouteData(IpAddress ip, MacAddress mac) {
+ this.ipAddress = ip;
+ this.macAddress = mac;
+ timestamp = System.currentTimeMillis();
+ }
+ }
}
diff --git a/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterfaceManager.java b/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterfaceManager.java
index 9f56f10..342ef14 100644
--- a/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterfaceManager.java
+++ b/apps/pim/src/main/java/org/onosproject/pim/impl/PIMInterfaceManager.java
@@ -29,12 +29,22 @@
import org.onosproject.incubator.net.intf.InterfaceListener;
import org.onosproject.incubator.net.intf.InterfaceService;
import org.onosproject.net.ConnectPoint;
+
+import org.onosproject.net.Host;
+
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.host.HostService;
+import org.onosproject.net.mcast.McastEvent;
+import org.onosproject.net.mcast.McastListener;
+import org.onosproject.net.mcast.McastRoute;
+import org.onosproject.net.mcast.MulticastRouteService;
import org.onosproject.net.packet.PacketService;
+import org.onosproject.routing.RouteEntry;
+import org.onosproject.routing.RoutingService;
import org.slf4j.Logger;
import java.util.Map;
@@ -73,6 +83,8 @@
private final int timeoutTaskPeriod = DEFAULT_TASK_PERIOD_MS;
+ private final int joinTaskPeriod = 10000;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@@ -82,13 +94,26 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected InterfaceService interfaceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected HostService hostService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MulticastRouteService multicastRouteService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected RoutingService unicastRoutingService;
+
// Store PIM Interfaces in a map key'd by ConnectPoint
private final Map<ConnectPoint, PIMInterface> pimInterfaces = Maps.newConcurrentMap();
+ private final Map<McastRoute, PIMInterface> routes = Maps.newConcurrentMap();
+
private final InternalNetworkConfigListener configListener =
new InternalNetworkConfigListener();
private final InternalInterfaceListener interfaceListener =
new InternalInterfaceListener();
+ private final InternalMulticastListener multicastListener =
+ new InternalMulticastListener();
private final ConfigFactory<ConnectPoint, PimInterfaceConfig> pimConfigFactory
= new ConfigFactory<ConnectPoint, PimInterfaceConfig>(
@@ -115,6 +140,9 @@
networkConfig.addListener(configListener);
interfaceService.addListener(interfaceListener);
+ multicastRouteService.addListener(multicastListener);
+
+ multicastRouteService.getRoutes().forEach(this::addRoute);
// Schedule the periodic hello sender.
scheduledExecutorService.scheduleAtFixedRate(
@@ -128,6 +156,11 @@
() -> pimInterfaces.values().forEach(PIMInterface::checkNeighborTimeouts)),
0, timeoutTaskPeriod, TimeUnit.MILLISECONDS);
+ scheduledExecutorService.scheduleAtFixedRate(
+ SafeRecurringTask.wrap(
+ () -> pimInterfaces.values().forEach(PIMInterface::sendJoins)),
+ 0, joinTaskPeriod, TimeUnit.MILLISECONDS);
+
log.info("Started");
}
@@ -135,6 +168,7 @@
public void deactivate() {
interfaceService.removeListener(interfaceListener);
networkConfig.removeListener(configListener);
+ multicastRouteService.removeListener(multicastListener);
networkConfig.unregisterConfigFactory(pimConfigFactory);
// Shutdown the periodic hello task.
@@ -202,6 +236,65 @@
return builder.build();
}
+ private void addRoute(McastRoute route) {
+ PIMInterface pimInterface = getSourceInterface(route);
+
+ if (pimInterface == null) {
+ return;
+ }
+
+ routes.put(route, pimInterface);
+ }
+
+ private void removeRoute(McastRoute route) {
+ PIMInterface pimInterface = routes.remove(route);
+
+ if (pimInterface == null) {
+ return;
+ }
+
+ pimInterface.removeRoute(route);
+ }
+
+ private PIMInterface getSourceInterface(McastRoute route) {
+ RouteEntry routeEntry = unicastRoutingService.getLongestMatchableRouteEntry(route.source());
+
+ if (routeEntry == null) {
+ log.warn("No route to source {}", route.source());
+ return null;
+ }
+
+ Interface intf = interfaceService.getMatchingInterface(routeEntry.nextHop());
+
+ if (intf == null) {
+ log.warn("No interface with route to next hop {}", routeEntry.nextHop());
+ return null;
+ }
+
+ PIMInterface pimInterface = pimInterfaces.get(intf.connectPoint());
+
+ if (pimInterface == null) {
+ log.warn("PIM is not enabled on interface {}", intf);
+ return null;
+ }
+
+ Set<Host> hosts = hostService.getHostsByIp(routeEntry.nextHop());
+ Host host = null;
+ for (Host h : hosts) {
+ if (h.vlan().equals(intf.vlan())) {
+ host = h;
+ }
+ }
+ if (host == null) {
+ log.warn("Next hop host entry not found: {}", routeEntry.nextHop());
+ return null;
+ }
+
+ pimInterface.addRoute(route, routeEntry.nextHop(), host.mac());
+
+ return pimInterface;
+ }
+
/**
* Listener for network config events.
*/
@@ -261,4 +354,26 @@
}
}
}
+
+ /**
+ * Listener for multicast route events.
+ */
+ private class InternalMulticastListener implements McastListener {
+ @Override
+ public void event(McastEvent event) {
+ switch (event.type()) {
+ case ROUTE_ADDED:
+ addRoute(event.subject().route());
+ break;
+ case ROUTE_REMOVED:
+ removeRoute(event.subject().route());
+ break;
+ case SOURCE_ADDED:
+ case SINK_ADDED:
+ case SINK_REMOVED:
+ default:
+ break;
+ }
+ }
+ }
}