adding igmp query polling to igmp application.
Change-Id: I995336417e11404d96f33cdae96b12202d454dd1
adding SafeRecurringTask
Change-Id: Ie560e61500f85339c296f03ed8684078737edcd1
diff --git a/apps/igmp/pom.xml b/apps/igmp/pom.xml
index 5f5ed13..c6d550d 100644
--- a/apps/igmp/pom.xml
+++ b/apps/igmp/pom.xml
@@ -57,7 +57,6 @@
<version>${project.version}</version>
</dependency>
-
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
@@ -84,36 +83,4 @@
</dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <extensions>true</extensions>
- <configuration>
- <instructions>
- <Bundle-SymbolicName>
- ${project.groupId}.${project.artifactId}
- </Bundle-SymbolicName>
- <Import-Package>
- org.slf4j,
- org.osgi.framework,
- com.google.common.*,
- org.onlab.packet.*,
- org.onosproject.*,
- </Import-Package>
- </instructions>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
</project>
diff --git a/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
index a0e2aeb..6b914ca 100644
--- a/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
+++ b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
@@ -25,10 +25,12 @@
import org.onlab.packet.Ethernet;
import org.onlab.packet.IGMP;
import org.onlab.packet.IGMPMembership;
+import org.onlab.packet.IGMPQuery;
import org.onlab.packet.IPv4;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
+import org.onlab.util.SafeRecurringTask;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.ConnectPoint;
@@ -44,6 +46,7 @@
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flowobjective.DefaultFilteringObjective;
import org.onosproject.net.flowobjective.FilteringObjective;
@@ -53,6 +56,7 @@
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.mcast.McastRoute;
import org.onosproject.net.mcast.MulticastRouteService;
+import org.onosproject.net.packet.DefaultOutboundPacket;
import org.onosproject.net.packet.InboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
@@ -61,10 +65,16 @@
import org.onosproject.olt.AccessDeviceData;
import org.slf4j.Logger;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -72,14 +82,29 @@
*/
@Component(immediate = true)
public class IgmpSnoop {
+
+
private final Logger log = getLogger(getClass());
+ private static final String DEST_MAC = "01:00:5E:00:00:01";
+ private static final String DEST_IP = "224.0.0.1";
+
+ private static final int DEFAULT_QUERY_PERIOD_SECS = 60;
+ private static final byte DEFAULT_IGMP_RESP_CODE = 0;
private static final String DEFAULT_MCAST_ADDR = "224.0.0.0/4";
@Property(name = "multicastAddress",
label = "Define the multicast base range to listen to")
private String multicastAddress = DEFAULT_MCAST_ADDR;
+ @Property(name = "queryPeriod", intValue = DEFAULT_QUERY_PERIOD_SECS,
+ label = "Delay in seconds between successive query runs")
+ private int queryPeriod = DEFAULT_QUERY_PERIOD_SECS;
+
+ @Property(name = "maxRespCode", byteValue = DEFAULT_IGMP_RESP_CODE,
+ label = "Maximum time allowed before sending a responding report")
+ private byte maxRespCode = DEFAULT_IGMP_RESP_CODE;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowObjectiveService flowObjectiveService;
@@ -98,6 +123,12 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
+
+ private ScheduledFuture<?> queryTask;
+ private final ScheduledExecutorService queryService =
+ Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/igmp-query",
+ "membership-query"));
+
private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
private DeviceListener deviceListener = new InternalDeviceListener();
@@ -120,6 +151,9 @@
};
+ private ByteBuffer queryPacket;
+
+
@Activate
public void activate() {
appId = coreService.registerApplication("org.onosproject.igmp");
@@ -149,6 +183,14 @@
deviceService.addListener(deviceListener);
+ queryPacket = buildQueryPacket();
+
+ queryTask = queryService.scheduleWithFixedDelay(
+ SafeRecurringTask.wrap(this::querySubscribers),
+ 0,
+ queryPeriod,
+ TimeUnit.SECONDS);
+
log.info("Started");
}
@@ -159,6 +201,8 @@
deviceService.removeListener(deviceListener);
networkConfig.removeListener(configListener);
networkConfig.unregisterConfigFactory(configFactory);
+ queryTask.cancel(true);
+ queryService.shutdownNow();
log.info("Stopped");
}
@@ -194,19 +238,6 @@
flowObjectiveService.filter(devId, igmp);
}
- private void processQuery(IGMP pkt, ConnectPoint location) {
- // TODO is this the right thing to do for a query?
- pkt.getGroups().forEach(group -> group.getSources().forEach(src -> {
-
- McastRoute route = new McastRoute(src,
- group.getGaddr(),
- McastRoute.Type.IGMP);
- multicastService.add(route);
- multicastService.addSink(route, location);
-
- }));
- }
-
private void processMembership(IGMP pkt, ConnectPoint location) {
pkt.getGroups().forEach(group -> {
@@ -218,8 +249,8 @@
IGMPMembership membership = (IGMPMembership) group;
McastRoute route = new McastRoute(IpAddress.valueOf("0.0.0.0"),
- group.getGaddr(),
- McastRoute.Type.IGMP);
+ group.getGaddr(),
+ McastRoute.Type.IGMP);
if (membership.getRecordType() == IGMPMembership.MODE_IS_INCLUDE ||
membership.getRecordType() == IGMPMembership.CHANGE_TO_INCLUDE_MODE) {
@@ -237,6 +268,44 @@
});
}
+ private ByteBuffer buildQueryPacket() {
+ IGMP igmp = new IGMP();
+ igmp.setIgmpType(IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY);
+ igmp.setMaxRespCode(maxRespCode);
+
+ IGMPQuery query = new IGMPQuery(IpAddress.valueOf("0.0.0.0"), 0);
+ igmp.addGroup(query);
+
+ IPv4 ip = new IPv4();
+ ip.setDestinationAddress(DEST_IP);
+ ip.setProtocol(IPv4.PROTOCOL_IGMP);
+ ip.setSourceAddress("192.168.1.1");
+ ip.setTtl((byte) 1);
+ ip.setPayload(igmp);
+
+ Ethernet eth = new Ethernet();
+ eth.setDestinationMACAddress(DEST_MAC);
+ eth.setSourceMACAddress("DE:AD:BE:EF:BA:11");
+ eth.setEtherType(Ethernet.TYPE_IPV4);
+
+ eth.setPayload(ip);
+
+ return ByteBuffer.wrap(eth.serialize());
+ }
+
+ private void querySubscribers() {
+ oltData.keySet().stream()
+ .flatMap(did -> deviceService.getPorts(did).stream())
+ .filter(p -> !oltData.get(p.element().id()).uplink().equals(p.number()))
+ .filter(p -> p.isEnabled())
+ .forEach(p -> {
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .setOutput(p.number()).build();
+ packetService.emit(new DefaultOutboundPacket((DeviceId) p.element().id(),
+ treatment, queryPacket));
+ });
+ }
+
/**
* Packet processor responsible for handling IGMP packets.
*/
@@ -295,14 +364,15 @@
break;
case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
- processQuery(igmp, pkt.receivedFrom());
+ log.debug("Received a membership query {} from {}",
+ igmp, pkt.receivedFrom());
break;
case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
log.debug("IGMP version 1 & 2 message types are not currently supported. Message type: {}",
- igmp.getIgmpType());
+ igmp.getIgmpType());
break;
default:
log.debug("Unknown IGMP message type: {}", igmp.getIgmpType());