adding igmp query polling to igmp application.

Change-Id: I995336417e11404d96f33cdae96b12202d454dd1

adding SafeRecurringTask

Change-Id: Ie560e61500f85339c296f03ed8684078737edcd1
diff --git a/apps/igmp/pom.xml b/apps/igmp/pom.xml
index 5f5ed13..c6d550d 100644
--- a/apps/igmp/pom.xml
+++ b/apps/igmp/pom.xml
@@ -57,7 +57,6 @@
             <version>${project.version}</version>
         </dependency>
 
-
         <dependency>
             <groupId>org.onosproject</groupId>
             <artifactId>onlab-osgi</artifactId>
@@ -84,36 +83,4 @@
 
     </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-bundle-plugin</artifactId>
-                <extensions>true</extensions>
-                <configuration>
-                    <instructions>
-                        <Bundle-SymbolicName>
-                            ${project.groupId}.${project.artifactId}
-                        </Bundle-SymbolicName>
-                        <Import-Package>
-                            org.slf4j,
-                            org.osgi.framework,
-                            com.google.common.*,
-                            org.onlab.packet.*,
-                            org.onosproject.*,
-                        </Import-Package>
-                    </instructions>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
 </project>
diff --git a/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
index a0e2aeb..6b914ca 100644
--- a/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
+++ b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
@@ -25,10 +25,12 @@
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IGMP;
 import org.onlab.packet.IGMPMembership;
+import org.onlab.packet.IGMPQuery;
 import org.onlab.packet.IPv4;
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
+import org.onlab.util.SafeRecurringTask;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.ConnectPoint;
@@ -44,6 +46,7 @@
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
 import org.onosproject.net.flowobjective.FilteringObjective;
@@ -53,6 +56,7 @@
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.mcast.McastRoute;
 import org.onosproject.net.mcast.MulticastRouteService;
+import org.onosproject.net.packet.DefaultOutboundPacket;
 import org.onosproject.net.packet.InboundPacket;
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
@@ -61,10 +65,16 @@
 import org.onosproject.olt.AccessDeviceData;
 import org.slf4j.Logger;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -72,14 +82,29 @@
  */
 @Component(immediate = true)
 public class IgmpSnoop {
+
+
     private final Logger log = getLogger(getClass());
 
+    private static final String DEST_MAC = "01:00:5E:00:00:01";
+    private static final String DEST_IP = "224.0.0.1";
+
+    private static final int DEFAULT_QUERY_PERIOD_SECS = 60;
+    private static final byte DEFAULT_IGMP_RESP_CODE = 0;
     private static final String DEFAULT_MCAST_ADDR = "224.0.0.0/4";
 
     @Property(name = "multicastAddress",
             label = "Define the multicast base range to listen to")
     private String multicastAddress = DEFAULT_MCAST_ADDR;
 
+    @Property(name = "queryPeriod", intValue = DEFAULT_QUERY_PERIOD_SECS,
+            label = "Delay in seconds between successive query runs")
+    private int queryPeriod = DEFAULT_QUERY_PERIOD_SECS;
+
+    @Property(name = "maxRespCode", byteValue = DEFAULT_IGMP_RESP_CODE,
+            label = "Maximum time allowed before sending a responding report")
+    private byte maxRespCode = DEFAULT_IGMP_RESP_CODE;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowObjectiveService flowObjectiveService;
 
@@ -98,6 +123,12 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
+
+    private ScheduledFuture<?> queryTask;
+    private final ScheduledExecutorService queryService =
+            Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/igmp-query",
+                                                                      "membership-query"));
+
     private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
 
     private DeviceListener deviceListener = new InternalDeviceListener();
@@ -120,6 +151,9 @@
             };
 
 
+    private ByteBuffer queryPacket;
+
+
     @Activate
     public void activate() {
         appId = coreService.registerApplication("org.onosproject.igmp");
@@ -149,6 +183,14 @@
 
         deviceService.addListener(deviceListener);
 
+        queryPacket = buildQueryPacket();
+
+        queryTask = queryService.scheduleWithFixedDelay(
+                SafeRecurringTask.wrap(this::querySubscribers),
+                0,
+                queryPeriod,
+                TimeUnit.SECONDS);
+
         log.info("Started");
     }
 
@@ -159,6 +201,8 @@
         deviceService.removeListener(deviceListener);
         networkConfig.removeListener(configListener);
         networkConfig.unregisterConfigFactory(configFactory);
+        queryTask.cancel(true);
+        queryService.shutdownNow();
         log.info("Stopped");
     }
 
@@ -194,19 +238,6 @@
         flowObjectiveService.filter(devId, igmp);
     }
 
-    private void processQuery(IGMP pkt, ConnectPoint location) {
-        // TODO is this the right thing to do for a query?
-        pkt.getGroups().forEach(group -> group.getSources().forEach(src -> {
-
-            McastRoute route = new McastRoute(src,
-                                              group.getGaddr(),
-                                              McastRoute.Type.IGMP);
-            multicastService.add(route);
-            multicastService.addSink(route, location);
-
-        }));
-    }
-
     private void processMembership(IGMP pkt, ConnectPoint location) {
         pkt.getGroups().forEach(group -> {
 
@@ -218,8 +249,8 @@
             IGMPMembership membership = (IGMPMembership) group;
 
             McastRoute route = new McastRoute(IpAddress.valueOf("0.0.0.0"),
-                    group.getGaddr(),
-                    McastRoute.Type.IGMP);
+                                              group.getGaddr(),
+                                              McastRoute.Type.IGMP);
 
             if (membership.getRecordType() == IGMPMembership.MODE_IS_INCLUDE ||
                     membership.getRecordType() == IGMPMembership.CHANGE_TO_INCLUDE_MODE) {
@@ -237,6 +268,44 @@
         });
     }
 
+    private ByteBuffer buildQueryPacket() {
+        IGMP igmp = new IGMP();
+        igmp.setIgmpType(IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY);
+        igmp.setMaxRespCode(maxRespCode);
+
+        IGMPQuery query = new IGMPQuery(IpAddress.valueOf("0.0.0.0"), 0);
+        igmp.addGroup(query);
+
+        IPv4 ip = new IPv4();
+        ip.setDestinationAddress(DEST_IP);
+        ip.setProtocol(IPv4.PROTOCOL_IGMP);
+        ip.setSourceAddress("192.168.1.1");
+        ip.setTtl((byte) 1);
+        ip.setPayload(igmp);
+
+        Ethernet eth = new Ethernet();
+        eth.setDestinationMACAddress(DEST_MAC);
+        eth.setSourceMACAddress("DE:AD:BE:EF:BA:11");
+        eth.setEtherType(Ethernet.TYPE_IPV4);
+
+        eth.setPayload(ip);
+
+        return ByteBuffer.wrap(eth.serialize());
+    }
+
+    private void querySubscribers() {
+        oltData.keySet().stream()
+                .flatMap(did -> deviceService.getPorts(did).stream())
+                .filter(p -> !oltData.get(p.element().id()).uplink().equals(p.number()))
+                .filter(p -> p.isEnabled())
+                .forEach(p -> {
+                    TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                            .setOutput(p.number()).build();
+                    packetService.emit(new DefaultOutboundPacket((DeviceId) p.element().id(),
+                                                                 treatment, queryPacket));
+                });
+    }
+
     /**
      * Packet processor responsible for handling IGMP packets.
      */
@@ -295,14 +364,15 @@
                     break;
 
                 case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
-                    processQuery(igmp, pkt.receivedFrom());
+                    log.debug("Received a membership query {} from {}",
+                              igmp, pkt.receivedFrom());
                     break;
 
                 case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
                 case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
                 case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
                     log.debug("IGMP version 1 & 2 message types are not currently supported. Message type: {}",
-                                      igmp.getIgmpType());
+                              igmp.getIgmpType());
                     break;
                 default:
                     log.debug("Unknown IGMP message type: {}", igmp.getIgmpType());