Moving Source from connect point to HostId in MulticastHandling

Change-Id: Ie8f678e150b7ee388680b8d8f27df0bce60ec01f
diff --git a/src/main/java/org/onosproject/t3/impl/McastGenerator.java b/src/main/java/org/onosproject/t3/impl/McastGenerator.java
index 59d5ba6..a5022a2 100644
--- a/src/main/java/org/onosproject/t3/impl/McastGenerator.java
+++ b/src/main/java/org/onosproject/t3/impl/McastGenerator.java
@@ -63,25 +63,27 @@
         mcastService.getRoutes().forEach(route -> {
             McastRouteData routeData = mcastService.routeData(route);
             IpAddress group = route.group();
-            routeData.sources().forEach(source -> {
-                TrafficSelector.Builder selector = DefaultTrafficSelector.builder()
-                        .matchVlanId(vlanId)
-                        .matchInPort(source.port());
-                if (group.isIp4()) {
-                    selector.matchEthDst(IPV4_ADDRESS)
-                            .matchIPDst(group.toIpPrefix())
-                            .matchEthType(EthType.EtherType.IPV4.ethType().toShort());
-                } else {
-                    selector.matchEthDst(IPV6_ADDRESS)
-                            .matchIPv6Dst(group.toIpPrefix())
-                            .matchEthType(EthType.EtherType.IPV6.ethType().toShort());
-                }
-                try {
-                    yield(ImmutableSet.of(manager.trace(selector.build(), source)));
-                } catch (InterruptedException e) {
-                    log.warn("Interrupted generator", e.getMessage());
-                    log.debug("exception", e);
-                }
+            routeData.sources().forEach((host, sources) -> {
+                sources.forEach(source -> {
+                    TrafficSelector.Builder selector = DefaultTrafficSelector.builder()
+                            .matchVlanId(vlanId)
+                            .matchInPort(source.port());
+                    if (group.isIp4()) {
+                        selector.matchEthDst(IPV4_ADDRESS)
+                                .matchIPDst(group.toIpPrefix())
+                                .matchEthType(EthType.EtherType.IPV4.ethType().toShort());
+                    } else {
+                        selector.matchEthDst(IPV6_ADDRESS)
+                                .matchIPv6Dst(group.toIpPrefix())
+                                .matchEthType(EthType.EtherType.IPV6.ethType().toShort());
+                    }
+                    try {
+                        yield(ImmutableSet.of(manager.trace(selector.build(), source)));
+                    } catch (InterruptedException e) {
+                        log.warn("Interrupted generator", e.getMessage());
+                        log.debug("exception", e);
+                    }
+                });
             });
 
         });